From 86237588916a5b94dc5b95ccbac586697e06274a Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 15 Jan 2025 18:34:05 +0800 Subject: [PATCH 1/5] external storage: use ListObjectsV2 and handle S3 express one zone Signed-off-by: lance6716 --- br/pkg/storage/s3.go | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/br/pkg/storage/s3.go b/br/pkg/storage/s3.go index 00a90f55c3400..5e4779932f278 100644 --- a/br/pkg/storage/s3.go +++ b/br/pkg/storage/s3.go @@ -311,6 +311,9 @@ func createOssRAMCred() (*credentials.Credentials, error) { return newCred, nil } +// see https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-express-APIs.html +var s3ExpressEndpointRE = regexp.MustCompile(`s3express.*amazonaws\.com`) + // NewS3Storage initialize a new s3 storage for metadata. func NewS3Storage(ctx context.Context, backend *backuppb.S3, opts *ExternalStorageOptions) (obj *S3Storage, errRet error) { qs := *backend @@ -329,8 +332,19 @@ func NewS3Storage(ctx context.Context, backend *backuppb.S3, opts *ExternalStora request.WithRetryer(awsConfig, defaultS3Retryer()) } + skipAutoDetectAWSRegion := false if qs.Endpoint != "" { awsConfig.WithEndpoint(qs.Endpoint) + if s3ExpressEndpointRE.MatchString(qs.Endpoint) { + log.Info("found S3 Express One Zone endpoint", zap.String("endpoint", qs.Endpoint)) + if qs.Region == "" { + return nil, errors.Errorf("must specify region for S3 Express One Zone endpoint") + } + awsConfig. + WithS3DisableContentMD5Validation(true). + WithS3ForcePathStyle(false) + skipAutoDetectAWSRegion = true + } } if opts.HTTPClient != nil { awsConfig.WithHTTPClient(opts.HTTPClient) @@ -383,7 +397,7 @@ func NewS3Storage(ctx context.Context, backend *backuppb.S3, opts *ExternalStora c := s3.New(ses, s3CliConfigs...) var region string - if len(qs.Provider) == 0 || qs.Provider == "aws" { + if (len(qs.Provider) == 0 || qs.Provider == "aws") && !skipAutoDetectAWSRegion { confCred := ses.Config.Credentials setCredOpt := func(req *request.Request) { // s3manager.GetBucketRegionWithClient will set credential anonymous, which works with s3. @@ -699,17 +713,14 @@ func (rs *S3Storage) WalkDir(ctx context.Context, opt *WalkOption, fn func(strin if opt.ListCount > 0 { maxKeys = opt.ListCount } - req := &s3.ListObjectsInput{ + req := &s3.ListObjectsV2Input{ Bucket: aws.String(rs.options.Bucket), Prefix: aws.String(prefix), MaxKeys: aws.Int64(maxKeys), } for { - // FIXME: We can't use ListObjectsV2, it is not universally supported. - // (Ceph RGW supported ListObjectsV2 since v15.1.0, released 2020 Jan 30th) - // (as of 2020, DigitalOcean Spaces still does not support V2 - https://developers.digitalocean.com/documentation/spaces/#list-bucket-contents) - res, err := rs.svc.ListObjectsWithContext(ctx, req) + res, err := rs.svc.ListObjectsV2WithContext(ctx, req) if err != nil { return errors.Trace(err) } @@ -723,7 +734,7 @@ func (rs *S3Storage) WalkDir(ctx context.Context, opt *WalkOption, fn func(strin // "If response does not include the NextMarker and it is truncated, // you can use the value of the last Key in the response as the marker // in the subsequent request to get the next set of object keys." - req.Marker = r.Key + req.StartAfter = r.Key // when walk on specify directory, the result include storage.Prefix, // which can not be reuse in other API(Open/Read) directly. From cb0b29499e167f43ffe58c190f98f96f4afa5b5d Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 15 Jan 2025 20:02:36 +0800 Subject: [PATCH 2/5] fix CI Signed-off-by: lance6716 --- br/pkg/storage/s3_test.go | 48 +++++++++++++++++++-------------------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/br/pkg/storage/s3_test.go b/br/pkg/storage/s3_test.go index 0f28b9514feb3..fee591a82f6c8 100644 --- a/br/pkg/storage/s3_test.go +++ b/br/pkg/storage/s3_test.go @@ -1019,74 +1019,74 @@ func TestWalkDir(t *testing.T) { // first call serve item #0, #1; second call #2, #3; third call #4. firstCall := s.s3.EXPECT(). - ListObjectsWithContext(ctx, gomock.Any()). - DoAndReturn(func(_ context.Context, input *s3.ListObjectsInput, opt ...request.Option) (*s3.ListObjectsOutput, error) { + ListObjectsV2WithContext(ctx, gomock.Any()). + DoAndReturn(func(_ context.Context, input *s3.ListObjectsV2Input, opt ...request.Option) (*s3.ListObjectsV2Output, error) { require.Equal(t, "bucket", aws.StringValue(input.Bucket)) require.Equal(t, "prefix/sp/", aws.StringValue(input.Prefix)) - require.Equal(t, "", aws.StringValue(input.Marker)) + require.Equal(t, "", aws.StringValue(input.StartAfter)) require.Equal(t, int64(2), aws.Int64Value(input.MaxKeys)) require.Equal(t, "", aws.StringValue(input.Delimiter)) - return &s3.ListObjectsOutput{ + return &s3.ListObjectsV2Output{ IsTruncated: aws.Bool(true), Contents: contents[:2], }, nil }) secondCall := s.s3.EXPECT(). - ListObjectsWithContext(ctx, gomock.Any()). - DoAndReturn(func(_ context.Context, input *s3.ListObjectsInput, opt ...request.Option) (*s3.ListObjectsOutput, error) { - require.Equal(t, aws.StringValue(contents[1].Key), aws.StringValue(input.Marker)) + ListObjectsV2WithContext(ctx, gomock.Any()). + DoAndReturn(func(_ context.Context, input *s3.ListObjectsV2Input, opt ...request.Option) (*s3.ListObjectsV2Output, error) { + require.Equal(t, aws.StringValue(contents[1].Key), aws.StringValue(input.StartAfter)) require.Equal(t, int64(2), aws.Int64Value(input.MaxKeys)) - return &s3.ListObjectsOutput{ + return &s3.ListObjectsV2Output{ IsTruncated: aws.Bool(true), Contents: contents[2:4], }, nil }). After(firstCall) thirdCall := s.s3.EXPECT(). - ListObjectsWithContext(ctx, gomock.Any()). - DoAndReturn(func(_ context.Context, input *s3.ListObjectsInput, opt ...request.Option) (*s3.ListObjectsOutput, error) { - require.Equal(t, aws.StringValue(contents[3].Key), aws.StringValue(input.Marker)) + ListObjectsV2WithContext(ctx, gomock.Any()). + DoAndReturn(func(_ context.Context, input *s3.ListObjectsV2Input, opt ...request.Option) (*s3.ListObjectsV2Output, error) { + require.Equal(t, aws.StringValue(contents[3].Key), aws.StringValue(input.StartAfter)) require.Equal(t, int64(2), aws.Int64Value(input.MaxKeys)) - return &s3.ListObjectsOutput{ + return &s3.ListObjectsV2Output{ IsTruncated: aws.Bool(false), Contents: contents[4:], }, nil }). After(secondCall) fourthCall := s.s3.EXPECT(). - ListObjectsWithContext(ctx, gomock.Any()). - DoAndReturn(func(_ context.Context, input *s3.ListObjectsInput, opt ...request.Option) (*s3.ListObjectsOutput, error) { + ListObjectsV2WithContext(ctx, gomock.Any()). + DoAndReturn(func(_ context.Context, input *s3.ListObjectsV2Input, opt ...request.Option) (*s3.ListObjectsV2Output, error) { require.Equal(t, "bucket", aws.StringValue(input.Bucket)) require.Equal(t, "prefix/", aws.StringValue(input.Prefix)) - require.Equal(t, "", aws.StringValue(input.Marker)) + require.Equal(t, "", aws.StringValue(input.StartAfter)) require.Equal(t, int64(4), aws.Int64Value(input.MaxKeys)) require.Equal(t, "", aws.StringValue(input.Delimiter)) - return &s3.ListObjectsOutput{ + return &s3.ListObjectsV2Output{ IsTruncated: aws.Bool(true), Contents: contents[:4], }, nil }). After(thirdCall) fifthCall := s.s3.EXPECT(). - ListObjectsWithContext(ctx, gomock.Any()). - DoAndReturn(func(_ context.Context, input *s3.ListObjectsInput, opt ...request.Option) (*s3.ListObjectsOutput, error) { - require.Equal(t, aws.StringValue(contents[3].Key), aws.StringValue(input.Marker)) + ListObjectsV2WithContext(ctx, gomock.Any()). + DoAndReturn(func(_ context.Context, input *s3.ListObjectsV2Input, opt ...request.Option) (*s3.ListObjectsV2Output, error) { + require.Equal(t, aws.StringValue(contents[3].Key), aws.StringValue(input.StartAfter)) require.Equal(t, int64(4), aws.Int64Value(input.MaxKeys)) - return &s3.ListObjectsOutput{ + return &s3.ListObjectsV2Output{ IsTruncated: aws.Bool(false), Contents: contents[4:], }, nil }). After(fourthCall) s.s3.EXPECT(). - ListObjectsWithContext(ctx, gomock.Any()). - DoAndReturn(func(_ context.Context, input *s3.ListObjectsInput, opt ...request.Option) (*s3.ListObjectsOutput, error) { + ListObjectsV2WithContext(ctx, gomock.Any()). + DoAndReturn(func(_ context.Context, input *s3.ListObjectsV2Input, opt ...request.Option) (*s3.ListObjectsV2Output, error) { require.Equal(t, "bucket", aws.StringValue(input.Bucket)) require.Equal(t, "prefix/sp/1", aws.StringValue(input.Prefix)) - require.Equal(t, "", aws.StringValue(input.Marker)) + require.Equal(t, "", aws.StringValue(input.StartAfter)) require.Equal(t, int64(3), aws.Int64Value(input.MaxKeys)) require.Equal(t, "", aws.StringValue(input.Delimiter)) - return &s3.ListObjectsOutput{ + return &s3.ListObjectsV2Output{ IsTruncated: aws.Bool(false), Contents: contents[2:], }, nil From 51ac2b2f63e2d2dec5b56beab5225359702243d3 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Fri, 17 Jan 2025 10:03:17 +0800 Subject: [PATCH 3/5] fix CI Signed-off-by: lance6716 --- br/pkg/storage/s3_test.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/br/pkg/storage/s3_test.go b/br/pkg/storage/s3_test.go index fee591a82f6c8..1eb54831ad7ae 100644 --- a/br/pkg/storage/s3_test.go +++ b/br/pkg/storage/s3_test.go @@ -1168,27 +1168,27 @@ func TestWalkDirWithEmptyPrefix(t *testing.T) { }, } firstCall := s3API.EXPECT(). - ListObjectsWithContext(ctx, gomock.Any()). - DoAndReturn(func(_ context.Context, input *s3.ListObjectsInput, opt ...request.Option) (*s3.ListObjectsOutput, error) { + ListObjectsV2WithContext(ctx, gomock.Any()). + DoAndReturn(func(_ context.Context, input *s3.ListObjectsV2Input, opt ...request.Option) (*s3.ListObjectsV2Output, error) { require.Equal(t, "bucket", aws.StringValue(input.Bucket)) require.Equal(t, "", aws.StringValue(input.Prefix)) - require.Equal(t, "", aws.StringValue(input.Marker)) + require.Equal(t, "", aws.StringValue(input.StartAfter)) require.Equal(t, int64(2), aws.Int64Value(input.MaxKeys)) require.Equal(t, "", aws.StringValue(input.Delimiter)) - return &s3.ListObjectsOutput{ + return &s3.ListObjectsV2Output{ IsTruncated: aws.Bool(false), Contents: contents, }, nil }) s3API.EXPECT(). - ListObjectsWithContext(ctx, gomock.Any()). - DoAndReturn(func(_ context.Context, input *s3.ListObjectsInput, opt ...request.Option) (*s3.ListObjectsOutput, error) { + ListObjectsV2WithContext(ctx, gomock.Any()). + DoAndReturn(func(_ context.Context, input *s3.ListObjectsV2Input, opt ...request.Option) (*s3.ListObjectsV2Output, error) { require.Equal(t, "bucket", aws.StringValue(input.Bucket)) require.Equal(t, "sp/", aws.StringValue(input.Prefix)) - require.Equal(t, "", aws.StringValue(input.Marker)) + require.Equal(t, "", aws.StringValue(input.StartAfter)) require.Equal(t, int64(2), aws.Int64Value(input.MaxKeys)) require.Equal(t, "", aws.StringValue(input.Delimiter)) - return &s3.ListObjectsOutput{ + return &s3.ListObjectsV2Output{ IsTruncated: aws.Bool(false), Contents: contents[:1], }, nil From 052f53e708413f24d0b87c909cd8e4246c7bbcec Mon Sep 17 00:00:00 2001 From: lance6716 Date: Fri, 17 Jan 2025 11:33:17 +0800 Subject: [PATCH 4/5] add UT Signed-off-by: lance6716 --- br/pkg/storage/s3_test.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/br/pkg/storage/s3_test.go b/br/pkg/storage/s3_test.go index 1eb54831ad7ae..9f5fdcd390573 100644 --- a/br/pkg/storage/s3_test.go +++ b/br/pkg/storage/s3_test.go @@ -1472,3 +1472,22 @@ func TestS3ReadFileRetryable(t *testing.T) { require.Error(t, err) require.True(t, strings.Contains(err.Error(), errMsg)) } + +func TestS3ExpressEndpoint(t *testing.T) { + ctx := context.Background() + _, err := NewS3Storage(ctx, &backuppb.S3{ + Endpoint: "https://s3express-use1-az6.us-east-1.amazonaws.com", + Bucket: "xxxxxxx-global-sort-test--use1-az6--x-s3", + Region: "us-east-1", + ForcePathStyle: true, + }, &ExternalStorageOptions{}) + require.NoError(t, err) + // if the endpoint is not S3 express, this function will fail because no network for auto adjust region. + _, err = NewS3Storage(ctx, &backuppb.S3{ + Endpoint: "https://us-east-1.amazonaws.com", + Bucket: "xxxxxxx-global-sort-test--use1-az6--x-s3", + Region: "us-east-1", + ForcePathStyle: true, + }, &ExternalStorageOptions{}) + require.Error(t, err) +} From 5bc83d39ef68f926336c16c17fd62a4e74e62e50 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Fri, 17 Jan 2025 14:30:05 +0800 Subject: [PATCH 5/5] change API Signed-off-by: lance6716 --- br/pkg/storage/parse_test.go | 3 ++- br/pkg/storage/s3.go | 31 +++++++++++++++---------------- br/pkg/storage/s3_test.go | 5 +++-- 3 files changed, 20 insertions(+), 19 deletions(-) diff --git a/br/pkg/storage/parse_test.go b/br/pkg/storage/parse_test.go index 0669564961c77..384739472b3e6 100644 --- a/br/pkg/storage/parse_test.go +++ b/br/pkg/storage/parse_test.go @@ -56,7 +56,7 @@ func TestCreateStorage(t *testing.T) { require.Equal(t, "https://s3.example.com", s3.Endpoint) require.False(t, s3.ForcePathStyle) - s, err = ParseBackend("ks3://bucket2/prefix/", s3opt) + s, err = ParseBackend("ks3://bucket2/prefix/?storage-class=express-one-zone", s3opt) require.NoError(t, err) s3 = s.GetS3() require.NotNil(t, s3) @@ -65,6 +65,7 @@ func TestCreateStorage(t *testing.T) { require.Equal(t, "https://s3.example.com", s3.Endpoint) require.Equal(t, ks3SDKProvider, s3.Provider) require.False(t, s3.ForcePathStyle) + require.Equal(t, "express-one-zone", s3.StorageClass) // nolint:lll s, err = ParseBackend(`s3://bucket3/prefix/path?endpoint=https://127.0.0.1:9000&force_path_style=0&SSE=aws:kms&sse-kms-key-id=TestKey&xyz=abc`, nil) diff --git a/br/pkg/storage/s3.go b/br/pkg/storage/s3.go index e785537c39c51..b9671d0a9c9f0 100644 --- a/br/pkg/storage/s3.go +++ b/br/pkg/storage/s3.go @@ -51,7 +51,10 @@ const ( s3ProviderOption = "s3.provider" s3RoleARNOption = "s3.role-arn" s3ExternalIDOption = "s3.external-id" - notFound = "NotFound" + + storageExpressOneZone = "express-one-zone" + + notFound = "NotFound" // number of retries to make of operations. maxRetries = 7 // max number of retries when meets error @@ -330,9 +333,6 @@ func createOssRAMCred() (*credentials.Credentials, error) { return newCred, nil } -// see https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-express-APIs.html -var s3ExpressEndpointRE = regexp.MustCompile(`s3express.*amazonaws\.com`) - // NewS3Storage initialize a new s3 storage for metadata. func NewS3Storage(ctx context.Context, backend *backuppb.S3, opts *ExternalStorageOptions) (obj *S3Storage, errRet error) { qs := *backend @@ -351,19 +351,18 @@ func NewS3Storage(ctx context.Context, backend *backuppb.S3, opts *ExternalStora request.WithRetryer(awsConfig, defaultS3Retryer()) } - skipAutoDetectAWSRegion := false + if qs.StorageClass == storageExpressOneZone { + if qs.Endpoint == "" { + return nil, errors.Errorf("must specify endpoint for S3 Express One Zone storage class") + } + if qs.Region == "" { + return nil, errors.Errorf("must specify region for S3 Express One Zone storage class") + } + awsConfig.WithS3DisableContentMD5Validation(true). + WithS3ForcePathStyle(false) + } if qs.Endpoint != "" { awsConfig.WithEndpoint(qs.Endpoint) - if s3ExpressEndpointRE.MatchString(qs.Endpoint) { - log.Info("found S3 Express One Zone endpoint", zap.String("endpoint", qs.Endpoint)) - if qs.Region == "" { - return nil, errors.Errorf("must specify region for S3 Express One Zone endpoint") - } - awsConfig. - WithS3DisableContentMD5Validation(true). - WithS3ForcePathStyle(false) - skipAutoDetectAWSRegion = true - } } if opts.HTTPClient != nil { awsConfig.WithHTTPClient(opts.HTTPClient) @@ -416,7 +415,7 @@ func NewS3Storage(ctx context.Context, backend *backuppb.S3, opts *ExternalStora c := s3.New(ses, s3CliConfigs...) var region string - if (len(qs.Provider) == 0 || qs.Provider == "aws") && !skipAutoDetectAWSRegion { + if (len(qs.Provider) == 0 || qs.Provider == "aws") && qs.StorageClass != storageExpressOneZone { confCred := ses.Config.Credentials setCredOpt := func(req *request.Request) { // s3manager.GetBucketRegionWithClient will set credential anonymous, which works with s3. diff --git a/br/pkg/storage/s3_test.go b/br/pkg/storage/s3_test.go index 9f5fdcd390573..794f77c02202d 100644 --- a/br/pkg/storage/s3_test.go +++ b/br/pkg/storage/s3_test.go @@ -1480,11 +1480,12 @@ func TestS3ExpressEndpoint(t *testing.T) { Bucket: "xxxxxxx-global-sort-test--use1-az6--x-s3", Region: "us-east-1", ForcePathStyle: true, + StorageClass: "express-one-zone", }, &ExternalStorageOptions{}) require.NoError(t, err) - // if the endpoint is not S3 express, this function will fail because no network for auto adjust region. + // if storage class is not S3 express, this function will fail because no network for auto adjust region. _, err = NewS3Storage(ctx, &backuppb.S3{ - Endpoint: "https://us-east-1.amazonaws.com", + Endpoint: "https://s3express-use1-az6.us-east-1.amazonaws.com", Bucket: "xxxxxxx-global-sort-test--use1-az6--x-s3", Region: "us-east-1", ForcePathStyle: true,