diff --git a/go.mod b/go.mod index c63ee12316c..1fb24ed3505 100644 --- a/go.mod +++ b/go.mod @@ -74,7 +74,7 @@ require ( github.com/okzk/sdnotify v0.0.0-20240725214427-1c1fdd37c5ac github.com/prometheus/procfs v0.15.1 github.com/shirou/gopsutil/v4 v4.24.8 - github.com/thanos-io/objstore v0.0.0-20240913165201-fd105025a2e5 + github.com/thanos-io/objstore v0.0.0-20241010161353-f90c89a0ef90 github.com/twmb/franz-go v1.17.1 github.com/twmb/franz-go/pkg/kadm v1.13.0 github.com/twmb/franz-go/pkg/kfake v0.0.0-20240821035758-b77dd13e2bfa diff --git a/go.sum b/go.sum index 7ee8b415164..c5a8cf125ea 100644 --- a/go.sum +++ b/go.sum @@ -1683,8 +1683,8 @@ github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8 github.com/subosito/gotenv v1.4.1/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNGqEflhK0= github.com/tencentyun/cos-go-sdk-v5 v0.7.40 h1:W6vDGKCHe4wBACI1d2UgE6+50sJFhRWU4O8IB2ozzxM= github.com/tencentyun/cos-go-sdk-v5 v0.7.40/go.mod h1:4dCEtLHGh8QPxHEkgq+nFaky7yZxQuYwgSJM87icDaw= -github.com/thanos-io/objstore v0.0.0-20240913165201-fd105025a2e5 h1:sb2s6y+T5+iaNElATi4bpzw2lGMcd5YjAUvxQp9ePtw= -github.com/thanos-io/objstore v0.0.0-20240913165201-fd105025a2e5/go.mod h1:A5Rlc/vdyENE5D0as6+6kp4kxWO72b4R0Q1ay/1d230= +github.com/thanos-io/objstore v0.0.0-20241010161353-f90c89a0ef90 h1:+gDIM0kLg4mpwU4RjvG09KiZsPrCy9EBHf8J/uJ+Ve0= +github.com/thanos-io/objstore v0.0.0-20241010161353-f90c89a0ef90/go.mod h1:/ZMUxFcp/nT6oYV5WslH9k07NU/+86+aibgZRmMMr/4= github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU= github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI= github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk= diff --git a/pkg/storage/bucket/azure/bucket_client.go b/pkg/storage/bucket/azure/bucket_client.go index ae2ce1b8986..625a618825d 100644 --- a/pkg/storage/bucket/azure/bucket_client.go +++ b/pkg/storage/bucket/azure/bucket_client.go @@ -6,6 +6,8 @@ package azure import ( + "net/http" + "github.com/go-kit/log" "github.com/thanos-io/objstore" "github.com/thanos-io/objstore/providers/azure" @@ -15,7 +17,7 @@ func NewBucketClient(cfg Config, name string, logger log.Logger) (objstore.Bucke return newBucketClient(cfg, name, logger, azure.NewBucketWithConfig) } -func newBucketClient(cfg Config, name string, logger log.Logger, factory func(log.Logger, azure.Config, string) (*azure.Bucket, error)) (objstore.Bucket, error) { +func newBucketClient(cfg Config, name string, logger log.Logger, factory func(log.Logger, azure.Config, string, http.RoundTripper) (*azure.Bucket, error)) (objstore.Bucket, error) { // Start with default config to make sure that all parameters are set to sensible values, especially // HTTP Config field. bucketConfig := azure.DefaultConfig @@ -31,5 +33,5 @@ func newBucketClient(cfg Config, name string, logger log.Logger, factory func(lo bucketConfig.Endpoint = cfg.Endpoint } - return factory(logger, bucketConfig, name) + return factory(logger, bucketConfig, name, nil) } diff --git a/pkg/storage/bucket/azure/bucket_client_test.go b/pkg/storage/bucket/azure/bucket_client_test.go index 482b75c99cb..1ccc0bf96f9 100644 --- a/pkg/storage/bucket/azure/bucket_client_test.go +++ b/pkg/storage/bucket/azure/bucket_client_test.go @@ -3,6 +3,7 @@ package azure import ( + "net/http" "testing" "github.com/go-kit/log" @@ -54,7 +55,7 @@ func TestNewBucketClient(t *testing.T) { } // fakeFactory is a test utility to act as an azure.Bucket factory, but in reality verify the input config. -func fakeFactory(t *testing.T, cfg Config) func(log.Logger, azure.Config, string) (*azure.Bucket, error) { +func fakeFactory(t *testing.T, cfg Config) func(log.Logger, azure.Config, string, http.RoundTripper) (*azure.Bucket, error) { expCfg := azure.DefaultConfig expCfg.StorageAccountName = cfg.StorageAccountName expCfg.StorageAccountKey = cfg.StorageAccountKey.String() @@ -66,7 +67,7 @@ func fakeFactory(t *testing.T, cfg Config) func(log.Logger, azure.Config, string expCfg.Endpoint = cfg.Endpoint } - return func(_ log.Logger, azCfg azure.Config, _ string) (*azure.Bucket, error) { + return func(_ log.Logger, azCfg azure.Config, _ string, _ http.RoundTripper) (*azure.Bucket, error) { t.Helper() assert.Equal(t, expCfg, azCfg) diff --git a/pkg/storage/bucket/gcs/bucket_client.go b/pkg/storage/bucket/gcs/bucket_client.go index 6e8dd23698b..675af93a29b 100644 --- a/pkg/storage/bucket/gcs/bucket_client.go +++ b/pkg/storage/bucket/gcs/bucket_client.go @@ -19,5 +19,5 @@ func NewBucketClient(ctx context.Context, cfg Config, name string, logger log.Lo Bucket: cfg.BucketName, ServiceAccount: cfg.ServiceAccount.String(), } - return gcs.NewBucketWithConfig(ctx, logger, bucketConfig, name) + return gcs.NewBucketWithConfig(ctx, logger, bucketConfig, name, nil) } diff --git a/pkg/storage/bucket/s3/bucket_client.go b/pkg/storage/bucket/s3/bucket_client.go index 2a7a643aac2..2a7a7779986 100644 --- a/pkg/storage/bucket/s3/bucket_client.go +++ b/pkg/storage/bucket/s3/bucket_client.go @@ -25,7 +25,7 @@ func NewBucketClient(cfg Config, name string, logger log.Logger) (objstore.Bucke return nil, err } - return s3.NewBucketWithConfig(logger, s3Cfg, name) + return s3.NewBucketWithConfig(logger, s3Cfg, name, nil) } // NewBucketReaderClient creates a new S3 bucket client @@ -35,7 +35,7 @@ func NewBucketReaderClient(cfg Config, name string, logger log.Logger) (objstore return nil, err } - return s3.NewBucketWithConfig(logger, s3Cfg, name) + return s3.NewBucketWithConfig(logger, s3Cfg, name, nil) } func newS3Config(cfg Config) (s3.Config, error) { diff --git a/pkg/storage/bucket/swift/bucket_client.go b/pkg/storage/bucket/swift/bucket_client.go index 5d13dc63991..04207886100 100644 --- a/pkg/storage/bucket/swift/bucket_client.go +++ b/pkg/storage/bucket/swift/bucket_client.go @@ -50,5 +50,5 @@ func NewBucketClient(cfg Config, _ string, logger log.Logger) (objstore.Bucket, return nil, err } - return swift.NewContainer(logger, serialized) + return swift.NewContainer(logger, serialized, nil) } diff --git a/vendor/github.com/thanos-io/objstore/providers/azure/azure.go b/vendor/github.com/thanos-io/objstore/providers/azure/azure.go index 63bd2c1aadb..c2f3adb517e 100644 --- a/vendor/github.com/thanos-io/objstore/providers/azure/azure.go +++ b/vendor/github.com/thanos-io/objstore/providers/azure/azure.go @@ -6,6 +6,7 @@ package azure import ( "context" "io" + "net/http" "os" "strings" "testing" @@ -145,7 +146,7 @@ type Bucket struct { } // NewBucket returns a new Bucket using the provided Azure config. -func NewBucket(logger log.Logger, azureConfig []byte, component string) (*Bucket, error) { +func NewBucket(logger log.Logger, azureConfig []byte, component string, rt http.RoundTripper) (*Bucket, error) { level.Debug(logger).Log("msg", "creating new Azure bucket connection", "component", component) conf, err := parseConfig(azureConfig) if err != nil { @@ -154,11 +155,14 @@ func NewBucket(logger log.Logger, azureConfig []byte, component string) (*Bucket if conf.MSIResource != "" { level.Warn(logger).Log("msg", "The field msi_resource has been deprecated and should no longer be set") } - return NewBucketWithConfig(logger, conf, component) + return NewBucketWithConfig(logger, conf, component, rt) } // NewBucketWithConfig returns a new Bucket using the provided Azure config struct. -func NewBucketWithConfig(logger log.Logger, conf Config, component string) (*Bucket, error) { +func NewBucketWithConfig(logger log.Logger, conf Config, component string, rt http.RoundTripper) (*Bucket, error) { + if rt != nil { + conf.HTTPConfig.Transport = rt + } if err := conf.validate(); err != nil { return nil, err } @@ -355,7 +359,7 @@ func NewTestBucket(t testing.TB, component string) (objstore.Bucket, func(), err if err != nil { return nil, nil, err } - bkt, err := NewBucket(log.NewNopLogger(), bc, component) + bkt, err := NewBucket(log.NewNopLogger(), bc, component, nil) if err != nil { t.Errorf("Cannot create Azure storage container:") return nil, nil, err diff --git a/vendor/github.com/thanos-io/objstore/providers/azure/helpers.go b/vendor/github.com/thanos-io/objstore/providers/azure/helpers.go index 846394a08a7..2915fbbbc83 100644 --- a/vendor/github.com/thanos-io/objstore/providers/azure/helpers.go +++ b/vendor/github.com/thanos-io/objstore/providers/azure/helpers.go @@ -20,10 +20,14 @@ import ( const DirDelim = "/" func getContainerClient(conf Config) (*container.Client, error) { - dt, err := exthttp.DefaultTransport(conf.HTTPConfig) + var rt http.RoundTripper + rt, err := exthttp.DefaultTransport(conf.HTTPConfig) if err != nil { return nil, err } + if conf.HTTPConfig.Transport != nil { + rt = conf.HTTPConfig.Transport + } opt := &container.ClientOptions{ ClientOptions: azcore.ClientOptions{ Retry: policy.RetryOptions{ @@ -35,7 +39,7 @@ func getContainerClient(conf Config) (*container.Client, error) { Telemetry: policy.TelemetryOptions{ ApplicationID: "Thanos", }, - Transport: &http.Client{Transport: dt}, + Transport: &http.Client{Transport: rt}, }, } diff --git a/vendor/github.com/thanos-io/objstore/providers/gcs/gcs.go b/vendor/github.com/thanos-io/objstore/providers/gcs/gcs.go index 87e31f66bac..b5dae200554 100644 --- a/vendor/github.com/thanos-io/objstore/providers/gcs/gcs.go +++ b/vendor/github.com/thanos-io/objstore/providers/gcs/gcs.go @@ -52,7 +52,8 @@ type Config struct { // ChunkSizeBytes controls the maximum number of bytes of the object that the // Writer will attempt to send to the server in a single request // Used as storage.Writer.ChunkSize of https://pkg.go.dev/google.golang.org/cloud/storage#Writer - ChunkSizeBytes int `yaml:"chunk_size_bytes"` + ChunkSizeBytes int `yaml:"chunk_size_bytes"` + noAuth bool `yaml:"no_auth"` } // Bucket implements the store.Bucket and shipper.Bucket interfaces against GCS. @@ -76,20 +77,22 @@ func parseConfig(conf []byte) (Config, error) { } // NewBucket returns a new Bucket against the given bucket handle. -func NewBucket(ctx context.Context, logger log.Logger, conf []byte, component string) (*Bucket, error) { +func NewBucket(ctx context.Context, logger log.Logger, conf []byte, component string, rt http.RoundTripper) (*Bucket, error) { config, err := parseConfig(conf) if err != nil { return nil, err } - return NewBucketWithConfig(ctx, logger, config, component) + return NewBucketWithConfig(ctx, logger, config, component, rt) } // NewBucketWithConfig returns a new Bucket with gcs Config struct. -func NewBucketWithConfig(ctx context.Context, logger log.Logger, gc Config, component string) (*Bucket, error) { +func NewBucketWithConfig(ctx context.Context, logger log.Logger, gc Config, component string, rt http.RoundTripper) (*Bucket, error) { if gc.Bucket == "" { return nil, errors.New("missing Google Cloud Storage bucket name for stored blocks") } - + if rt != nil { + gc.HTTPConfig.Transport = rt + } var opts []option.ClientOption // If ServiceAccount is provided, use them in GCS client, otherwise fallback to Google default logic. @@ -100,7 +103,9 @@ func NewBucketWithConfig(ctx context.Context, logger log.Logger, gc Config, comp } opts = append(opts, option.WithCredentials(credentials)) } - + if gc.noAuth { + opts = append(opts, option.WithoutAuthentication()) + } opts = append(opts, option.WithUserAgent(fmt.Sprintf("thanos-%s/%s (%s)", component, version.Version, runtime.Version())), ) @@ -120,14 +125,12 @@ func appendHttpOptions(gc Config, opts []option.ClientOption) ([]option.ClientOp // Check if a roundtripper has been set in the config // otherwise build the default transport. var rt http.RoundTripper + rt, err := exthttp.DefaultTransport(gc.HTTPConfig) + if err != nil { + return nil, err + } if gc.HTTPConfig.Transport != nil { rt = gc.HTTPConfig.Transport - } else { - var err error - rt, err = exthttp.DefaultTransport(gc.HTTPConfig) - if err != nil { - return nil, err - } } // GCS uses some defaults when "options.WithHTTPClient" is not used that are important when we call @@ -312,7 +315,7 @@ func NewTestBucket(t testing.TB, project string) (objstore.Bucket, func(), error return nil, nil, err } - b, err := NewBucket(ctx, log.NewNopLogger(), bc, "thanos-e2e-test") + b, err := NewBucket(ctx, log.NewNopLogger(), bc, "thanos-e2e-test", nil) if err != nil { return nil, nil, err } diff --git a/vendor/github.com/thanos-io/objstore/providers/s3/s3.go b/vendor/github.com/thanos-io/objstore/providers/s3/s3.go index afd0982dc2e..2f0447a7961 100644 --- a/vendor/github.com/thanos-io/objstore/providers/s3/s3.go +++ b/vendor/github.com/thanos-io/objstore/providers/s3/s3.go @@ -15,6 +15,7 @@ import ( "strings" "testing" + "github.com/efficientgo/core/logerrcapture" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/minio/minio-go/v7" @@ -175,13 +176,13 @@ func parseConfig(conf []byte) (Config, error) { } // NewBucket returns a new Bucket using the provided s3 config values. -func NewBucket(logger log.Logger, conf []byte, component string) (*Bucket, error) { +func NewBucket(logger log.Logger, conf []byte, component string, rt http.RoundTripper) (*Bucket, error) { config, err := parseConfig(conf) if err != nil { return nil, err } - return NewBucketWithConfig(logger, config, component) + return NewBucketWithConfig(logger, config, component, rt) } type overrideSignerType struct { @@ -201,7 +202,7 @@ func (s *overrideSignerType) Retrieve() (credentials.Value, error) { } // NewBucketWithConfig returns a new Bucket using the provided s3 config values. -func NewBucketWithConfig(logger log.Logger, config Config, component string) (*Bucket, error) { +func NewBucketWithConfig(logger log.Logger, config Config, component string, rt http.RoundTripper) (*Bucket, error) { var chain []credentials.Provider // TODO(bwplotka): Don't do flags as they won't scale, use actual params like v2, v4 instead @@ -241,25 +242,25 @@ func NewBucketWithConfig(logger log.Logger, config Config, component string) (*B }), } } - + if rt != nil { + config.HTTPConfig.Transport = rt + } // Check if a roundtripper has been set in the config // otherwise build the default transport. - var rt http.RoundTripper + var tpt http.RoundTripper + tpt, err := exthttp.DefaultTransport(config.HTTPConfig) + if err != nil { + return nil, err + } if config.HTTPConfig.Transport != nil { - rt = config.HTTPConfig.Transport - } else { - var err error - rt, err = exthttp.DefaultTransport(config.HTTPConfig) - if err != nil { - return nil, err - } + tpt = config.HTTPConfig.Transport } client, err := minio.New(config.Endpoint, &minio.Options{ Creds: credentials.NewChainCredentials(chain), Secure: !config.Insecure, Region: config.Region, - Transport: rt, + Transport: tpt, BucketLookup: config.BucketLookupType.MinioType(), }) if err != nil { @@ -437,12 +438,21 @@ func (b *Bucket) getRange(ctx context.Context, name string, off, length int64) ( return nil, err } } + r, err := b.client.GetObject(ctx, b.name, name, *opts) + if err != nil { + return nil, err + } + + // NotFoundObject error is revealed only after first Read. This does the initial GetRequest. Prefetch this here + // for convenience. + if _, err := r.Read(nil); err != nil { + defer logerrcapture.Do(b.logger, r.Close, "s3 get range obj close") - // StatObject to see if the object exists and we have permissions to read it - if _, err := b.client.StatObject(ctx, b.name, name, *opts); err != nil { + // First GET Object request error. return nil, err } - return b.client.GetObject(ctx, b.name, name, *opts) + + return r, nil } // Get returns a reader for the given object name. @@ -601,7 +611,7 @@ func NewTestBucketFromConfig(t testing.TB, location string, c Config, reuseBucke if err != nil { return nil, nil, err } - b, err := NewBucket(log.NewNopLogger(), bc, "thanos-e2e-test") + b, err := NewBucket(log.NewNopLogger(), bc, "thanos-e2e-test", nil) if err != nil { return nil, nil, err } diff --git a/vendor/github.com/thanos-io/objstore/providers/swift/swift.go b/vendor/github.com/thanos-io/objstore/providers/swift/swift.go index a8c56c55884..682c494cab2 100644 --- a/vendor/github.com/thanos-io/objstore/providers/swift/swift.go +++ b/vendor/github.com/thanos-io/objstore/providers/swift/swift.go @@ -154,12 +154,12 @@ type Container struct { segmentsContainer string } -func NewContainer(logger log.Logger, conf []byte) (*Container, error) { +func NewContainer(logger log.Logger, conf []byte, rt http.RoundTripper) (*Container, error) { sc, err := parseConfig(conf) if err != nil { return nil, errors.Wrap(err, "parse config") } - return NewContainerFromConfig(logger, sc, false) + return NewContainerFromConfig(logger, sc, false, rt) } func ensureContainer(connection *swift.Connection, name string, createIfNotExist bool) error { @@ -178,22 +178,22 @@ func ensureContainer(connection *swift.Connection, name string, createIfNotExist return nil } -func NewContainerFromConfig(logger log.Logger, sc *Config, createContainer bool) (*Container, error) { - +func NewContainerFromConfig(logger log.Logger, sc *Config, createContainer bool, rt http.RoundTripper) (*Container, error) { + if rt != nil { + sc.HTTPConfig.Transport = rt + } // Check if a roundtripper has been set in the config // otherwise build the default transport. - var rt http.RoundTripper + var tpt http.RoundTripper + tpt, err := exthttp.DefaultTransport(sc.HTTPConfig) + if err != nil { + return nil, err + } if sc.HTTPConfig.Transport != nil { - rt = sc.HTTPConfig.Transport - } else { - var err error - rt, err = exthttp.DefaultTransport(sc.HTTPConfig) - if err != nil { - return nil, err - } + tpt = sc.HTTPConfig.Transport } - connection := connectionFromConfig(sc, rt) + connection := connectionFromConfig(sc, tpt) if err := connection.Authenticate(); err != nil { return nil, errors.Wrap(err, "authentication") } @@ -378,7 +378,7 @@ func NewTestContainer(t testing.TB) (objstore.Bucket, func(), error) { "needs to be manually cleared. This means that it is only useful to run one test in a time. This is due " + "to safety (accidentally pointing prod container for test) as well as swift not being fully strong consistent.") } - c, err := NewContainerFromConfig(log.NewNopLogger(), config, false) + c, err := NewContainerFromConfig(log.NewNopLogger(), config, false, nil) if err != nil { return nil, nil, errors.Wrap(err, "initializing new container") } @@ -392,7 +392,7 @@ func NewTestContainer(t testing.TB) (objstore.Bucket, func(), error) { } config.ContainerName = objstore.CreateTemporaryTestBucketName(t) config.SegmentContainerName = config.ContainerName - c, err := NewContainerFromConfig(log.NewNopLogger(), config, true) + c, err := NewContainerFromConfig(log.NewNopLogger(), config, true, nil) if err != nil { return nil, nil, errors.Wrap(err, "initializing new container") } diff --git a/vendor/modules.txt b/vendor/modules.txt index 6fb4825ea84..a25b4bf8ab3 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1129,7 +1129,7 @@ github.com/stretchr/objx github.com/stretchr/testify/assert github.com/stretchr/testify/mock github.com/stretchr/testify/require -# github.com/thanos-io/objstore v0.0.0-20240913165201-fd105025a2e5 +# github.com/thanos-io/objstore v0.0.0-20241010161353-f90c89a0ef90 ## explicit; go 1.21 github.com/thanos-io/objstore github.com/thanos-io/objstore/exthttp