diff --git a/client/factory.go b/client/factory.go index 9e1adec9..bd345024 100644 --- a/client/factory.go +++ b/client/factory.go @@ -6,6 +6,7 @@ package client import ( "context" "fmt" + "net/http" "strings" "github.com/thanos-io/objstore" @@ -49,7 +50,7 @@ type BucketConfig struct { // NewBucket initializes and returns new object storage clients. // NOTE: confContentYaml can contain secrets. -func NewBucket(logger log.Logger, confContentYaml []byte, component string) (objstore.Bucket, error) { +func NewBucket(logger log.Logger, confContentYaml []byte, component string, rt http.RoundTripper) (objstore.Bucket, error) { level.Info(logger).Log("msg", "loading bucket configuration") bucketConf := &BucketConfig{} if err := yaml.UnmarshalStrict(confContentYaml, bucketConf); err != nil { @@ -64,23 +65,23 @@ func NewBucket(logger log.Logger, confContentYaml []byte, component string) (obj var bucket objstore.Bucket switch strings.ToUpper(string(bucketConf.Type)) { case string(GCS): - bucket, err = gcs.NewBucket(context.Background(), logger, config, component) + bucket, err = gcs.NewBucket(context.Background(), logger, config, component, rt) case string(S3): - bucket, err = s3.NewBucket(logger, config, component) + bucket, err = s3.NewBucket(logger, config, component, rt) case string(AZURE): - bucket, err = azure.NewBucket(logger, config, component) + bucket, err = azure.NewBucket(logger, config, component, rt) case string(SWIFT): - bucket, err = swift.NewContainer(logger, config) + bucket, err = swift.NewContainer(logger, config, rt) case string(COS): - bucket, err = cos.NewBucket(logger, config, component) + bucket, err = cos.NewBucket(logger, config, component, rt) case string(ALIYUNOSS): - bucket, err = oss.NewBucket(logger, config, component) + bucket, err = oss.NewBucket(logger, config, component, rt) case string(FILESYSTEM): bucket, err = filesystem.NewBucketFromConfig(config) case string(BOS): bucket, err = bos.NewBucket(logger, config, component) case string(OCI): - bucket, err = oci.NewBucket(logger, config) + bucket, err = oci.NewBucket(logger, config, rt) case string(OBS): bucket, err = obs.NewBucket(logger, config) default: diff --git a/client/factory_test.go b/client/factory_test.go index 4a9cf879..cd008add 100644 --- a/client/factory_test.go +++ b/client/factory_test.go @@ -23,7 +23,7 @@ func ExampleBucket() { } // Create a new bucket. - bucket, err := NewBucket(log.NewNopLogger(), confContentYaml, "example") + bucket, err := NewBucket(log.NewNopLogger(), confContentYaml, "example", nil) if err != nil { panic(err) } @@ -46,7 +46,7 @@ func ExampleTracingBucketUsingOpenTracing() { //nolint:govet } // Create a new bucket. - bucket, err := NewBucket(log.NewNopLogger(), confContentYaml, "example") + bucket, err := NewBucket(log.NewNopLogger(), confContentYaml, "example", nil) if err != nil { panic(err) } @@ -72,7 +72,7 @@ func ExampleTracingBucketUsingOpenTelemetry() { //nolint:govet } // Create a new bucket. - bucket, err := NewBucket(log.NewNopLogger(), confContentYaml, "example") + bucket, err := NewBucket(log.NewNopLogger(), confContentYaml, "example", nil) if err != nil { panic(err) } diff --git a/errutil/rt_error.go b/errutil/rt_error.go new file mode 100644 index 00000000..ad1309e1 --- /dev/null +++ b/errutil/rt_error.go @@ -0,0 +1,12 @@ +package errutil + +import "net/http" + +// ErrorRoundTripper is a custom RoundTripper that always returns an error. +type ErrorRoundTripper struct { + Err error +} + +func (ert *ErrorRoundTripper) RoundTrip(*http.Request) (*http.Response, error) { + return nil, ert.Err +} diff --git a/go.mod b/go.mod index cb95d387..db189c20 100644 --- a/go.mod +++ b/go.mod @@ -88,6 +88,7 @@ require ( github.com/prometheus/procfs v0.11.1 // indirect github.com/rs/xid v1.5.0 // indirect github.com/sony/gobreaker v0.5.0 // indirect + github.com/stretchr/objx v0.5.2 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect diff --git a/go.sum b/go.sum index 0b5b860b..6e22dfab 100644 --- a/go.sum +++ b/go.sum @@ -210,8 +210,9 @@ github.com/sony/gobreaker v0.5.0 h1:dRCvqm0P490vZPmy7ppEk2qCnCieBooFJ+YoXGYB+yg= github.com/sony/gobreaker v0.5.0/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= diff --git a/providers/azure/azure.go b/providers/azure/azure.go index 63bd2c1a..c2f3adb5 100644 --- a/providers/azure/azure.go +++ b/providers/azure/azure.go @@ -6,6 +6,7 @@ package azure import ( "context" "io" + "net/http" "os" "strings" "testing" @@ -145,7 +146,7 @@ type Bucket struct { } // NewBucket returns a new Bucket using the provided Azure config. -func NewBucket(logger log.Logger, azureConfig []byte, component string) (*Bucket, error) { +func NewBucket(logger log.Logger, azureConfig []byte, component string, rt http.RoundTripper) (*Bucket, error) { level.Debug(logger).Log("msg", "creating new Azure bucket connection", "component", component) conf, err := parseConfig(azureConfig) if err != nil { @@ -154,11 +155,14 @@ func NewBucket(logger log.Logger, azureConfig []byte, component string) (*Bucket if conf.MSIResource != "" { level.Warn(logger).Log("msg", "The field msi_resource has been deprecated and should no longer be set") } - return NewBucketWithConfig(logger, conf, component) + return NewBucketWithConfig(logger, conf, component, rt) } // NewBucketWithConfig returns a new Bucket using the provided Azure config struct. -func NewBucketWithConfig(logger log.Logger, conf Config, component string) (*Bucket, error) { +func NewBucketWithConfig(logger log.Logger, conf Config, component string, rt http.RoundTripper) (*Bucket, error) { + if rt != nil { + conf.HTTPConfig.Transport = rt + } if err := conf.validate(); err != nil { return nil, err } @@ -355,7 +359,7 @@ func NewTestBucket(t testing.TB, component string) (objstore.Bucket, func(), err if err != nil { return nil, nil, err } - bkt, err := NewBucket(log.NewNopLogger(), bc, component) + bkt, err := NewBucket(log.NewNopLogger(), bc, component, nil) if err != nil { t.Errorf("Cannot create Azure storage container:") return nil, nil, err diff --git a/providers/azure/azure_test.go b/providers/azure/azure_test.go index c49695ab..85533eaa 100644 --- a/providers/azure/azure_test.go +++ b/providers/azure/azure_test.go @@ -8,7 +8,10 @@ import ( "time" "github.com/efficientgo/core/testutil" + "github.com/go-kit/log" + "github.com/pkg/errors" + "github.com/thanos-io/objstore/errutil" "github.com/thanos-io/objstore/exthttp" ) @@ -20,7 +23,7 @@ type TestCase struct { } var validConfig = []byte(`storage_account: "myStorageAccount" -storage_account_key: "abc123" +storage_account_key: "bXlTdXBlclNlY3JldEtleTEyMyFAIw==" container: "MyContainer" endpoint: "blob.core.windows.net" reader_config: @@ -222,3 +225,16 @@ http_config: testutil.Ok(t, err) testutil.Equals(t, true, transport.TLSClientConfig.InsecureSkipVerify) } + +func TestNewBucketWithErrorRoundTripper(t *testing.T) { + cfg, err := parseConfig(validConfig) + testutil.Ok(t, err) + + rt := &errutil.ErrorRoundTripper{Err: errors.New("RoundTripper error")} + + _, err = NewBucketWithConfig(log.NewNopLogger(), cfg, "test", rt) + + // We expect an error from the RoundTripper + testutil.NotOk(t, err) + testutil.Assert(t, errors.Is(err, rt.Err), "Expected RoundTripper error, got: %v", err) +} diff --git a/providers/azure/helpers.go b/providers/azure/helpers.go index 846394a0..2915fbbb 100644 --- a/providers/azure/helpers.go +++ b/providers/azure/helpers.go @@ -20,10 +20,14 @@ import ( const DirDelim = "/" func getContainerClient(conf Config) (*container.Client, error) { - dt, err := exthttp.DefaultTransport(conf.HTTPConfig) + var rt http.RoundTripper + rt, err := exthttp.DefaultTransport(conf.HTTPConfig) if err != nil { return nil, err } + if conf.HTTPConfig.Transport != nil { + rt = conf.HTTPConfig.Transport + } opt := &container.ClientOptions{ ClientOptions: azcore.ClientOptions{ Retry: policy.RetryOptions{ @@ -35,7 +39,7 @@ func getContainerClient(conf Config) (*container.Client, error) { Telemetry: policy.TelemetryOptions{ ApplicationID: "Thanos", }, - Transport: &http.Client{Transport: dt}, + Transport: &http.Client{Transport: rt}, }, } diff --git a/providers/bos/bos.go b/providers/bos/bos.go index 72e1b1e0..74f9688e 100644 --- a/providers/bos/bos.go +++ b/providers/bos/bos.go @@ -66,6 +66,7 @@ func parseConfig(conf []byte) (Config, error) { // NewBucket new bos bucket. func NewBucket(logger log.Logger, conf []byte, component string) (*Bucket, error) { + // TODO(https://github.com/thanos-io/objstore/pull/140): Add support for custom roundtripper. if logger == nil { logger = log.NewNopLogger() } diff --git a/providers/cos/cos.go b/providers/cos/cos.go index e518cae2..a8b853e3 100644 --- a/providers/cos/cos.go +++ b/providers/cos/cos.go @@ -95,7 +95,7 @@ func parseConfig(conf []byte) (Config, error) { } // NewBucket returns a new Bucket using the provided cos configuration. -func NewBucket(logger log.Logger, conf []byte, component string) (*Bucket, error) { +func NewBucket(logger log.Logger, conf []byte, component string, rt http.RoundTripper) (*Bucket, error) { if logger == nil { logger = log.NewNopLogger() } @@ -104,12 +104,11 @@ func NewBucket(logger log.Logger, conf []byte, component string) (*Bucket, error if err != nil { return nil, errors.Wrap(err, "parsing cos configuration") } - - return NewBucketWithConfig(logger, config, component) + return NewBucketWithConfig(logger, config, component, rt) } // NewBucketWithConfig returns a new Bucket using the provided cos config values. -func NewBucketWithConfig(logger log.Logger, config Config, component string) (*Bucket, error) { +func NewBucketWithConfig(logger log.Logger, config Config, component string, rt http.RoundTripper) (*Bucket, error) { if err := config.validate(); err != nil { return nil, errors.Wrap(err, "validate cos configuration") } @@ -128,7 +127,14 @@ func NewBucketWithConfig(logger log.Logger, config Config, component string) (*B } } b := &cos.BaseURL{BucketURL: bucketURL} - tpt, _ := exthttp.DefaultTransport(config.HTTPConfig) + var tpt http.RoundTripper + tpt, err = exthttp.DefaultTransport(config.HTTPConfig) + if err != nil { + return nil, err + } + if rt != nil { + tpt = rt + } client := cos.NewClient(b, &http.Client{ Transport: &cos.AuthorizationTransport{ SecretID: config.SecretId, @@ -485,7 +491,7 @@ func NewTestBucket(t testing.TB) (objstore.Bucket, func(), error) { return nil, nil, err } - b, err := NewBucket(log.NewNopLogger(), bc, "thanos-e2e-test") + b, err := NewBucket(log.NewNopLogger(), bc, "thanos-e2e-test", nil) if err != nil { return nil, nil, err } @@ -506,7 +512,7 @@ func NewTestBucket(t testing.TB) (objstore.Bucket, func(), error) { return nil, nil, err } - b, err := NewBucket(log.NewNopLogger(), bc, "thanos-e2e-test") + b, err := NewBucket(log.NewNopLogger(), bc, "thanos-e2e-test", nil) if err != nil { return nil, nil, err } diff --git a/providers/cos/cos_test.go b/providers/cos/cos_test.go index 6d4316f2..f682aee9 100644 --- a/providers/cos/cos_test.go +++ b/providers/cos/cos_test.go @@ -4,12 +4,16 @@ package cos import ( + "context" "testing" "time" "github.com/efficientgo/core/testutil" + "github.com/go-kit/log" + "github.com/pkg/errors" "github.com/prometheus/common/model" + "github.com/thanos-io/objstore/errutil" "github.com/thanos-io/objstore/exthttp" ) @@ -137,3 +141,21 @@ func TestConfig_validate(t *testing.T) { }) } } + +func TestNewBucketWithErrorRoundTripper(t *testing.T) { + config := Config{ + Bucket: "bucket", + AppId: "123", + Region: "test", + SecretId: "sid", + SecretKey: "skey", + } + rt := &errutil.ErrorRoundTripper{Err: errors.New("RoundTripper error")} + + bkt, err := NewBucketWithConfig(log.NewNopLogger(), config, "test", rt) + testutil.Ok(t, err) + _, err = bkt.Get(context.Background(), "Test") + // We expect an error from the RoundTripper + testutil.NotOk(t, err) + testutil.Assert(t, errors.Is(err, rt.Err), "Expected RoundTripper error, got: %v", err) +} diff --git a/providers/gcs/gcs.go b/providers/gcs/gcs.go index 87e31f66..b5dae200 100644 --- a/providers/gcs/gcs.go +++ b/providers/gcs/gcs.go @@ -52,7 +52,8 @@ type Config struct { // ChunkSizeBytes controls the maximum number of bytes of the object that the // Writer will attempt to send to the server in a single request // Used as storage.Writer.ChunkSize of https://pkg.go.dev/google.golang.org/cloud/storage#Writer - ChunkSizeBytes int `yaml:"chunk_size_bytes"` + ChunkSizeBytes int `yaml:"chunk_size_bytes"` + noAuth bool `yaml:"no_auth"` } // Bucket implements the store.Bucket and shipper.Bucket interfaces against GCS. @@ -76,20 +77,22 @@ func parseConfig(conf []byte) (Config, error) { } // NewBucket returns a new Bucket against the given bucket handle. -func NewBucket(ctx context.Context, logger log.Logger, conf []byte, component string) (*Bucket, error) { +func NewBucket(ctx context.Context, logger log.Logger, conf []byte, component string, rt http.RoundTripper) (*Bucket, error) { config, err := parseConfig(conf) if err != nil { return nil, err } - return NewBucketWithConfig(ctx, logger, config, component) + return NewBucketWithConfig(ctx, logger, config, component, rt) } // NewBucketWithConfig returns a new Bucket with gcs Config struct. -func NewBucketWithConfig(ctx context.Context, logger log.Logger, gc Config, component string) (*Bucket, error) { +func NewBucketWithConfig(ctx context.Context, logger log.Logger, gc Config, component string, rt http.RoundTripper) (*Bucket, error) { if gc.Bucket == "" { return nil, errors.New("missing Google Cloud Storage bucket name for stored blocks") } - + if rt != nil { + gc.HTTPConfig.Transport = rt + } var opts []option.ClientOption // If ServiceAccount is provided, use them in GCS client, otherwise fallback to Google default logic. @@ -100,7 +103,9 @@ func NewBucketWithConfig(ctx context.Context, logger log.Logger, gc Config, comp } opts = append(opts, option.WithCredentials(credentials)) } - + if gc.noAuth { + opts = append(opts, option.WithoutAuthentication()) + } opts = append(opts, option.WithUserAgent(fmt.Sprintf("thanos-%s/%s (%s)", component, version.Version, runtime.Version())), ) @@ -120,14 +125,12 @@ func appendHttpOptions(gc Config, opts []option.ClientOption) ([]option.ClientOp // Check if a roundtripper has been set in the config // otherwise build the default transport. var rt http.RoundTripper + rt, err := exthttp.DefaultTransport(gc.HTTPConfig) + if err != nil { + return nil, err + } if gc.HTTPConfig.Transport != nil { rt = gc.HTTPConfig.Transport - } else { - var err error - rt, err = exthttp.DefaultTransport(gc.HTTPConfig) - if err != nil { - return nil, err - } } // GCS uses some defaults when "options.WithHTTPClient" is not used that are important when we call @@ -312,7 +315,7 @@ func NewTestBucket(t testing.TB, project string) (objstore.Bucket, func(), error return nil, nil, err } - b, err := NewBucket(ctx, log.NewNopLogger(), bc, "thanos-e2e-test") + b, err := NewBucket(ctx, log.NewNopLogger(), bc, "thanos-e2e-test", nil) if err != nil { return nil, nil, err } diff --git a/providers/gcs/gcs_test.go b/providers/gcs/gcs_test.go index d03c5dd9..39b55041 100644 --- a/providers/gcs/gcs_test.go +++ b/providers/gcs/gcs_test.go @@ -15,7 +15,9 @@ import ( "github.com/efficientgo/core/testutil" "github.com/fullstorydev/emulators/storage/gcsemu" "github.com/go-kit/log" + "github.com/pkg/errors" "github.com/prometheus/common/model" + "github.com/thanos-io/objstore/errutil" "google.golang.org/api/option" ) @@ -66,7 +68,7 @@ func TestNewBucketWithConfig_ShouldCreateGRPC(t *testing.T) { err = os.Setenv("STORAGE_EMULATOR_HOST_GRPC", svr.Addr) testutil.Ok(t, err) - bkt, err := NewBucketWithConfig(context.Background(), log.NewNopLogger(), cfg, "test-bucket") + bkt, err := NewBucketWithConfig(context.Background(), log.NewNopLogger(), cfg, "test-bucket", nil) testutil.Ok(t, err) // Check if the bucket is created. @@ -157,3 +159,24 @@ http_config: }) } } + +func TestNewBucketWithErrorRoundTripper(t *testing.T) { + rt := &errutil.ErrorRoundTripper{Err: errors.New("RoundTripper error")} + cfg := Config{ + Bucket: "test-bucket", + ServiceAccount: "", + UseGRPC: false, + noAuth: true, + } + svr, err := gcsemu.NewServer("127.0.0.1:0", gcsemu.Options{}) + testutil.Ok(t, err) + defer svr.Close() + err = os.Setenv("STORAGE_EMULATOR_HOST", svr.Addr) + testutil.Ok(t, err) + + bkt, err := NewBucketWithConfig(context.Background(), log.NewNopLogger(), cfg, "test-bucket", rt) + testutil.Ok(t, err) + _, err = bkt.Get(context.Background(), "test-bucket") + testutil.NotOk(t, err) + testutil.Assert(t, errors.Is(err, rt.Err), "Expected RoundTripper error, got: %v", err) +} diff --git a/providers/obs/obs.go b/providers/obs/obs.go index 7bd9666b..7de70633 100644 --- a/providers/obs/obs.go +++ b/providers/obs/obs.go @@ -75,11 +75,11 @@ type Bucket struct { } func NewBucket(logger log.Logger, conf []byte) (*Bucket, error) { + // TODO(https://github.com/thanos-io/objstore/pull/140): Add support for custom roundtripper. config, err := parseConfig(conf) if err != nil { return nil, errors.Wrap(err, "parsing cos configuration") } - return NewBucketWithConfig(logger, config) } diff --git a/providers/oci/oci.go b/providers/oci/oci.go index 2db35461..e2f9e98b 100644 --- a/providers/oci/oci.go +++ b/providers/oci/oci.go @@ -58,13 +58,14 @@ type HTTPConfig struct { ResponseHeaderTimeout model.Duration `yaml:"response_header_timeout"` InsecureSkipVerify bool `yaml:"insecure_skip_verify"` - TLSHandshakeTimeout model.Duration `yaml:"tls_handshake_timeout"` - ExpectContinueTimeout model.Duration `yaml:"expect_continue_timeout"` - MaxIdleConns int `yaml:"max_idle_conns"` - MaxIdleConnsPerHost int `yaml:"max_idle_conns_per_host"` - MaxConnsPerHost int `yaml:"max_conns_per_host"` - DisableCompression bool `yaml:"disable_compression"` - ClientTimeout time.Duration `yaml:"client_timeout"` + TLSHandshakeTimeout model.Duration `yaml:"tls_handshake_timeout"` + ExpectContinueTimeout model.Duration `yaml:"expect_continue_timeout"` + MaxIdleConns int `yaml:"max_idle_conns"` + MaxIdleConnsPerHost int `yaml:"max_idle_conns_per_host"` + MaxConnsPerHost int `yaml:"max_conns_per_host"` + DisableCompression bool `yaml:"disable_compression"` + ClientTimeout time.Duration `yaml:"client_timeout"` + Transport http.RoundTripper `yaml:"-"` } // Config stores the configuration for oci bucket. @@ -288,7 +289,7 @@ func (b *Bucket) deleteBucket(ctx context.Context) (err error) { } // NewBucket returns a new Bucket using the provided oci config values. -func NewBucket(logger log.Logger, ociConfig []byte) (*Bucket, error) { +func NewBucket(logger log.Logger, ociConfig []byte, rt http.RoundTripper) (*Bucket, error) { level.Debug(logger).Log("msg", "creating new oci bucket connection") var config = DefaultConfig var configurationProvider common.ConfigurationProvider @@ -335,8 +336,12 @@ func NewBucket(logger log.Logger, ociConfig []byte) (*Bucket, error) { return nil, errors.Wrapf(err, "unable to create ObjectStorage client with the given oci configurations") } + config.HTTPConfig.Transport = CustomTransport(config) + if rt != nil { + config.HTTPConfig.Transport = rt + } httpClient := http.Client{ - Transport: CustomTransport(config), + Transport: config.HTTPConfig.Transport, Timeout: config.HTTPConfig.ClientTimeout, } client.HTTPClient = &httpClient @@ -375,7 +380,7 @@ func NewTestBucket(t testing.TB) (objstore.Bucket, func(), error) { return nil, nil, err } - bkt, err := NewBucket(log.NewNopLogger(), ociConfig) + bkt, err := NewBucket(log.NewNopLogger(), ociConfig, nil) if err != nil { return nil, nil, err } diff --git a/providers/oci/oci_test.go b/providers/oci/oci_test.go new file mode 100644 index 00000000..0fc81ccf --- /dev/null +++ b/providers/oci/oci_test.go @@ -0,0 +1,47 @@ +package oci + +import ( + "testing" + + "github.com/efficientgo/core/testutil" + "github.com/go-kit/log" + "github.com/pkg/errors" + "github.com/thanos-io/objstore/errutil" + "gopkg.in/yaml.v2" +) + +func TestNewBucketWithErrorRoundTripper(t *testing.T) { + const mockPrivateKey = `-----BEGIN RSA PRIVATE KEY----- +MIICXgIBAAKBgQDCFENGw33yGihy92pDjZQhl0C36rPJj+CvfSC8+q28hxA161QF +NUd13wuCTUcq0Qd2qsBe/2hFyc2DCJJg0h1L78+6Z4UMR7EOcpfdUE9Hf3m/hs+F +UR45uBJeDK1HSFHD8bHKD6kv8FPGfJTotc+2xjJwoYi+1hqp1fIekaxsyQIDAQAB +AoGBAJR8ZkCUvx5kzv+utdl7T5MnordT1TvoXXJGXK7ZZ+UuvMNUCdN2QPc4sBiA +QWvLw1cSKt5DsKZ8UETpYPy8pPYnnDEz2dDYiaew9+xEpubyeW2oH4Zx71wqBtOK +kqwrXa/pzdpiucRRjk6vE6YY7EBBs/g7uanVpGibOVAEsqH1AkEA7DkjVH28WDUg +f1nqvfn2Kj6CT7nIcE3jGJsZZ7zlZmBmHFDONMLUrXR/Zm3pR5m0tCmBqa5RK95u +412jt1dPIwJBANJT3v8pnkth48bQo/fKel6uEYyboRtA5/uHuHkZ6FQF7OUkGogc +mSJluOdc5t6hI1VsLn0QZEjQZMEOWr+wKSMCQQCC4kXJEsHAve77oP6HtG/IiEn7 +kpyUXRNvFsDE0czpJJBvL/aRFUJxuRK91jhjC68sA7NsKMGg5OXb5I5Jj36xAkEA +gIT7aFOYBFwGgQAQkWNKLvySgKbAZRTeLBacpHMuQdl1DfdntvAyqpAZ0lY0RKmW +G6aFKaqQfOXKCyWoUiVknQJAXrlgySFci/2ueKlIE1QqIiLSZ8V8OlpFLRnb1pzI +7U1yQXnTAEFYM560yJlzUpOb1V4cScGd365tiSMvxLOvTA== +-----END RSA PRIVATE KEY-----` + + config := DefaultConfig + config.Provider = "raw" + config.Tenancy = "test" + config.User = "test" + config.Region = "test" + config.Fingerprint = "123" + config.PrivateKey = mockPrivateKey + config.Passphrase = "123" + ociConfig, err := yaml.Marshal(config) + testutil.Ok(t, err) + + rt := &errutil.ErrorRoundTripper{Err: errors.New("RoundTripper error")} + + _, err = NewBucket(log.NewNopLogger(), ociConfig, rt) + // We expect an error from the RoundTripper + testutil.NotOk(t, err) + testutil.Assert(t, errors.Is(err, rt.Err), "Expected RoundTripper error, got: %v", err) +} diff --git a/providers/oss/oss.go b/providers/oss/oss.go index e7e3a648..0a4cc76e 100644 --- a/providers/oss/oss.go +++ b/providers/oss/oss.go @@ -158,22 +158,26 @@ func (b *Bucket) Attributes(ctx context.Context, name string) (objstore.ObjectAt } // NewBucket returns a new Bucket using the provided oss config values. -func NewBucket(logger log.Logger, conf []byte, component string) (*Bucket, error) { +func NewBucket(logger log.Logger, conf []byte, component string, rt http.RoundTripper) (*Bucket, error) { var config Config if err := yaml.Unmarshal(conf, &config); err != nil { return nil, errors.Wrap(err, "parse aliyun oss config file failed") } - - return NewBucketWithConfig(logger, config, component) + return NewBucketWithConfig(logger, config, component, rt) } // NewBucketWithConfig returns a new Bucket using the provided oss config struct. -func NewBucketWithConfig(logger log.Logger, config Config, component string) (*Bucket, error) { +func NewBucketWithConfig(logger log.Logger, config Config, component string, rt http.RoundTripper) (*Bucket, error) { if err := validate(config); err != nil { return nil, err } - client, err := alioss.New(config.Endpoint, config.AccessKeyID, config.AccessKeySecret) + if rt != nil { + clientOption := func(client *alioss.Client) { + client.HTTPClient = &http.Client{Transport: rt} + } + client, err = alioss.New(config.Endpoint, config.AccessKeyID, config.AccessKeySecret, clientOption) + } if err != nil { return nil, errors.Wrap(err, "create aliyun oss client failed") } @@ -274,7 +278,7 @@ func NewTestBucketFromConfig(t testing.TB, c Config, reuseBucket bool) (objstore return nil, nil, err } - b, err := NewBucket(log.NewNopLogger(), bc, "thanos-aliyun-oss-test") + b, err := NewBucket(log.NewNopLogger(), bc, "thanos-aliyun-oss-test", nil) if err != nil { return nil, nil, err } diff --git a/providers/oss/oss_test.go b/providers/oss/oss_test.go new file mode 100644 index 00000000..99ad68a3 --- /dev/null +++ b/providers/oss/oss_test.go @@ -0,0 +1,28 @@ +package oss + +import ( + "context" + "testing" + + "github.com/efficientgo/core/testutil" + "github.com/go-kit/log" + "github.com/pkg/errors" + "github.com/thanos-io/objstore/errutil" +) + +func TestNewBucketWithErrorRoundTripper(t *testing.T) { + config := Config{ + Endpoint: "http://test.com/", + AccessKeyID: "123", + AccessKeySecret: "123", + Bucket: "test", + } + rt := &errutil.ErrorRoundTripper{Err: errors.New("RoundTripper error")} + + bkt, err := NewBucketWithConfig(log.NewNopLogger(), config, "test", rt) + // We expect an error from the RoundTripper + testutil.Ok(t, err) + _, err = bkt.Get(context.Background(), "test") + testutil.NotOk(t, err) + testutil.Assert(t, errors.Is(err, rt.Err), "Expected RoundTripper error, got: %v", err) +} diff --git a/providers/s3/s3.go b/providers/s3/s3.go index afd0982d..3d9ba213 100644 --- a/providers/s3/s3.go +++ b/providers/s3/s3.go @@ -175,13 +175,13 @@ func parseConfig(conf []byte) (Config, error) { } // NewBucket returns a new Bucket using the provided s3 config values. -func NewBucket(logger log.Logger, conf []byte, component string) (*Bucket, error) { +func NewBucket(logger log.Logger, conf []byte, component string, rt http.RoundTripper) (*Bucket, error) { config, err := parseConfig(conf) if err != nil { return nil, err } - return NewBucketWithConfig(logger, config, component) + return NewBucketWithConfig(logger, config, component, rt) } type overrideSignerType struct { @@ -201,7 +201,7 @@ func (s *overrideSignerType) Retrieve() (credentials.Value, error) { } // NewBucketWithConfig returns a new Bucket using the provided s3 config values. -func NewBucketWithConfig(logger log.Logger, config Config, component string) (*Bucket, error) { +func NewBucketWithConfig(logger log.Logger, config Config, component string, rt http.RoundTripper) (*Bucket, error) { var chain []credentials.Provider // TODO(bwplotka): Don't do flags as they won't scale, use actual params like v2, v4 instead @@ -241,25 +241,25 @@ func NewBucketWithConfig(logger log.Logger, config Config, component string) (*B }), } } - + if rt != nil { + config.HTTPConfig.Transport = rt + } // Check if a roundtripper has been set in the config // otherwise build the default transport. - var rt http.RoundTripper + var tpt http.RoundTripper + tpt, err := exthttp.DefaultTransport(config.HTTPConfig) + if err != nil { + return nil, err + } if config.HTTPConfig.Transport != nil { - rt = config.HTTPConfig.Transport - } else { - var err error - rt, err = exthttp.DefaultTransport(config.HTTPConfig) - if err != nil { - return nil, err - } + tpt = config.HTTPConfig.Transport } client, err := minio.New(config.Endpoint, &minio.Options{ Creds: credentials.NewChainCredentials(chain), Secure: !config.Insecure, Region: config.Region, - Transport: rt, + Transport: tpt, BucketLookup: config.BucketLookupType.MinioType(), }) if err != nil { @@ -601,7 +601,7 @@ func NewTestBucketFromConfig(t testing.TB, location string, c Config, reuseBucke if err != nil { return nil, nil, err } - b, err := NewBucket(log.NewNopLogger(), bc, "thanos-e2e-test") + b, err := NewBucket(log.NewNopLogger(), bc, "thanos-e2e-test", nil) if err != nil { return nil, nil, err } diff --git a/providers/s3/s3_e2e_test.go b/providers/s3/s3_e2e_test.go index 4b75a014..ac9ec261 100644 --- a/providers/s3/s3_e2e_test.go +++ b/providers/s3/s3_e2e_test.go @@ -37,6 +37,7 @@ func BenchmarkUpload(b *testing.B) { log.NewNopLogger(), e2ethanos.NewS3Config(bucket, m.Endpoint("https"), m.Dir()), "test-feed", + nil, ) testutil.Ok(b, err) diff --git a/providers/s3/s3_test.go b/providers/s3/s3_test.go index cdab39c3..2a44f0e0 100644 --- a/providers/s3/s3_test.go +++ b/providers/s3/s3_test.go @@ -16,7 +16,9 @@ import ( "github.com/efficientgo/core/testutil" "github.com/go-kit/log" "github.com/minio/minio-go/v7/pkg/encrypt" + "github.com/pkg/errors" + "github.com/thanos-io/objstore/errutil" "github.com/thanos-io/objstore/exthttp" ) @@ -324,7 +326,7 @@ func TestBucket_getServerSideEncryption(t *testing.T) { // Default config should return no SSE config. cfg := DefaultConfig cfg.Endpoint = endpoint - bkt, err := NewBucketWithConfig(log.NewNopLogger(), cfg, "test") + bkt, err := NewBucketWithConfig(log.NewNopLogger(), cfg, "test", nil) testutil.Ok(t, err) sse, err := bkt.getServerSideEncryption(context.Background()) @@ -335,7 +337,7 @@ func TestBucket_getServerSideEncryption(t *testing.T) { cfg = DefaultConfig cfg.Endpoint = endpoint cfg.SSEConfig = SSEConfig{Type: SSES3} - bkt, err = NewBucketWithConfig(log.NewNopLogger(), cfg, "test") + bkt, err = NewBucketWithConfig(log.NewNopLogger(), cfg, "test", nil) testutil.Ok(t, err) sse, err = bkt.getServerSideEncryption(context.Background()) @@ -351,7 +353,7 @@ func TestBucket_getServerSideEncryption(t *testing.T) { Type: SSEKMS, KMSKeyID: "key", } - bkt, err = NewBucketWithConfig(log.NewNopLogger(), cfg, "test") + bkt, err = NewBucketWithConfig(log.NewNopLogger(), cfg, "test", nil) testutil.Ok(t, err) sse, err = bkt.getServerSideEncryption(context.Background()) @@ -375,7 +377,7 @@ func TestBucket_getServerSideEncryption(t *testing.T) { KMSKeyID: "key", KMSEncryptionContext: map[string]string{"foo": "bar"}, } - bkt, err = NewBucketWithConfig(log.NewNopLogger(), cfg, "test") + bkt, err = NewBucketWithConfig(log.NewNopLogger(), cfg, "test", nil) testutil.Ok(t, err) sse, err = bkt.getServerSideEncryption(context.Background()) @@ -396,7 +398,7 @@ func TestBucket_getServerSideEncryption(t *testing.T) { override, err := encrypt.NewSSEKMS("test", nil) testutil.Ok(t, err) - bkt, err = NewBucketWithConfig(log.NewNopLogger(), cfg, "test") + bkt, err = NewBucketWithConfig(log.NewNopLogger(), cfg, "test", nil) testutil.Ok(t, err) sse, err = bkt.getServerSideEncryption(context.WithValue(context.Background(), sseConfigKey, override)) @@ -423,7 +425,7 @@ func TestBucket_Get_ShouldReturnErrorIfServerTruncateResponse(t *testing.T) { cfg.AccessKey = "test" cfg.SecretKey = "test" - bkt, err := NewBucketWithConfig(log.NewNopLogger(), cfg, "test") + bkt, err := NewBucketWithConfig(log.NewNopLogger(), cfg, "test", nil) testutil.Ok(t, err) reader, err := bkt.Get(context.Background(), "test") @@ -448,7 +450,7 @@ func TestParseConfig_CustomStorageClass(t *testing.T) { cfg.Endpoint = endpoint storageClass := "STANDARD_IA" cfg.PutUserMetadata[testCase.storageClassKey] = storageClass - bkt, err := NewBucketWithConfig(log.NewNopLogger(), cfg, "test") + bkt, err := NewBucketWithConfig(log.NewNopLogger(), cfg, "test", nil) testutil.Ok(t, err) testutil.Equals(t, storageClass, bkt.storageClass) }) @@ -458,7 +460,20 @@ func TestParseConfig_CustomStorageClass(t *testing.T) { func TestParseConfig_DefaultStorageClassIsZero(t *testing.T) { cfg := DefaultConfig cfg.Endpoint = endpoint - bkt, err := NewBucketWithConfig(log.NewNopLogger(), cfg, "test") + bkt, err := NewBucketWithConfig(log.NewNopLogger(), cfg, "test", nil) testutil.Ok(t, err) testutil.Equals(t, "", bkt.storageClass) } + +func TestNewBucketWithErrorRoundTripper(t *testing.T) { + cfg := DefaultConfig + cfg.Endpoint = endpoint + cfg.Bucket = "test" + rt := &errutil.ErrorRoundTripper{Err: errors.New("RoundTripper error")} + bkt, err := NewBucketWithConfig(log.NewNopLogger(), cfg, "test", rt) + testutil.Ok(t, err) + _, err = bkt.Get(context.Background(), "test") + // We expect an error from the RoundTripper + testutil.NotOk(t, err) + testutil.Assert(t, errors.Is(err, rt.Err), "Expected RoundTripper error, got: %v", err) +} diff --git a/providers/swift/swift.go b/providers/swift/swift.go index a8c56c55..682c494c 100644 --- a/providers/swift/swift.go +++ b/providers/swift/swift.go @@ -154,12 +154,12 @@ type Container struct { segmentsContainer string } -func NewContainer(logger log.Logger, conf []byte) (*Container, error) { +func NewContainer(logger log.Logger, conf []byte, rt http.RoundTripper) (*Container, error) { sc, err := parseConfig(conf) if err != nil { return nil, errors.Wrap(err, "parse config") } - return NewContainerFromConfig(logger, sc, false) + return NewContainerFromConfig(logger, sc, false, rt) } func ensureContainer(connection *swift.Connection, name string, createIfNotExist bool) error { @@ -178,22 +178,22 @@ func ensureContainer(connection *swift.Connection, name string, createIfNotExist return nil } -func NewContainerFromConfig(logger log.Logger, sc *Config, createContainer bool) (*Container, error) { - +func NewContainerFromConfig(logger log.Logger, sc *Config, createContainer bool, rt http.RoundTripper) (*Container, error) { + if rt != nil { + sc.HTTPConfig.Transport = rt + } // Check if a roundtripper has been set in the config // otherwise build the default transport. - var rt http.RoundTripper + var tpt http.RoundTripper + tpt, err := exthttp.DefaultTransport(sc.HTTPConfig) + if err != nil { + return nil, err + } if sc.HTTPConfig.Transport != nil { - rt = sc.HTTPConfig.Transport - } else { - var err error - rt, err = exthttp.DefaultTransport(sc.HTTPConfig) - if err != nil { - return nil, err - } + tpt = sc.HTTPConfig.Transport } - connection := connectionFromConfig(sc, rt) + connection := connectionFromConfig(sc, tpt) if err := connection.Authenticate(); err != nil { return nil, errors.Wrap(err, "authentication") } @@ -378,7 +378,7 @@ func NewTestContainer(t testing.TB) (objstore.Bucket, func(), error) { "needs to be manually cleared. This means that it is only useful to run one test in a time. This is due " + "to safety (accidentally pointing prod container for test) as well as swift not being fully strong consistent.") } - c, err := NewContainerFromConfig(log.NewNopLogger(), config, false) + c, err := NewContainerFromConfig(log.NewNopLogger(), config, false, nil) if err != nil { return nil, nil, errors.Wrap(err, "initializing new container") } @@ -392,7 +392,7 @@ func NewTestContainer(t testing.TB) (objstore.Bucket, func(), error) { } config.ContainerName = objstore.CreateTemporaryTestBucketName(t) config.SegmentContainerName = config.ContainerName - c, err := NewContainerFromConfig(log.NewNopLogger(), config, true) + c, err := NewContainerFromConfig(log.NewNopLogger(), config, true, nil) if err != nil { return nil, nil, errors.Wrap(err, "initializing new container") } diff --git a/providers/swift/swift_test.go b/providers/swift/swift_test.go index 656e7756..629dfd77 100644 --- a/providers/swift/swift_test.go +++ b/providers/swift/swift_test.go @@ -8,7 +8,10 @@ import ( "time" "github.com/efficientgo/core/testutil" + "github.com/go-kit/log" + "github.com/pkg/errors" "github.com/prometheus/common/model" + "github.com/thanos-io/objstore/errutil" ) func TestParseConfig(t *testing.T) { @@ -64,3 +67,15 @@ http_config: testutil.Equals(t, false, cfg.HTTPConfig.InsecureSkipVerify) } + +func TestNewBucketWithErrorRoundTripper(t *testing.T) { + logger := log.NewNopLogger() + rt := &errutil.ErrorRoundTripper{Err: errors.New("RoundTripper error")} + config := DefaultConfig + config.AuthUrl = "http://identity.something.com/v3" + _, err := NewContainerFromConfig(logger, &config, false, rt) + + // We expect an error from the RoundTripper + testutil.NotOk(t, err) + testutil.Assert(t, errors.Is(err, rt.Err), "Expected RoundTripper error, got: %v", err) +}