From b6bd2abe2d73a4ef3742f0cfac90bc70160290a0 Mon Sep 17 00:00:00 2001 From: Matheus Pimenta Date: Mon, 5 Aug 2024 13:01:00 +0100 Subject: [PATCH] Add proxy support for Azure buckets Signed-off-by: Matheus Pimenta --- api/v1beta2/bucket_types.go | 2 +- .../source.toolkit.fluxcd.io_buckets.yaml | 2 +- docs/api/v1beta2/source.md | 4 +- docs/spec/v1beta2/buckets.md | 2 +- internal/controller/bucket_controller.go | 9 +- pkg/azure/blob.go | 91 ++++++++++++++++--- pkg/azure/blob_integration_test.go | 20 ++-- pkg/azure/blob_test.go | 89 ++++++++++++++++++ 8 files changed, 191 insertions(+), 28 deletions(-) diff --git a/api/v1beta2/bucket_types.go b/api/v1beta2/bucket_types.go index 010f89897..aa0499731 100644 --- a/api/v1beta2/bucket_types.go +++ b/api/v1beta2/bucket_types.go @@ -113,7 +113,7 @@ type BucketSpec struct { // ProxySecretRef specifies the Secret containing the proxy configuration // to use while communicating with the Bucket server. // - // Only supported for the `generic` and `gcp` providers. + // Only supported for the `generic`, `gcp` and `azure` providers. // +optional ProxySecretRef *meta.LocalObjectReference `json:"proxySecretRef,omitempty"` diff --git a/config/crd/bases/source.toolkit.fluxcd.io_buckets.yaml b/config/crd/bases/source.toolkit.fluxcd.io_buckets.yaml index 636fbad2b..73cc94eed 100644 --- a/config/crd/bases/source.toolkit.fluxcd.io_buckets.yaml +++ b/config/crd/bases/source.toolkit.fluxcd.io_buckets.yaml @@ -397,7 +397,7 @@ spec: to use while communicating with the Bucket server. - Only supported for the `generic` and `gcp` providers. + Only supported for the `generic`, `gcp` and `azure` providers. properties: name: description: Name of the referent. diff --git a/docs/api/v1beta2/source.md b/docs/api/v1beta2/source.md index 2070115f6..fb7ec87c9 100644 --- a/docs/api/v1beta2/source.md +++ b/docs/api/v1beta2/source.md @@ -219,7 +219,7 @@ github.com/fluxcd/pkg/apis/meta.LocalObjectReference (Optional)

ProxySecretRef specifies the Secret containing the proxy configuration to use while communicating with the Bucket server.

-

Only supported for the generic and gcp providers.

+

Only supported for the generic, gcp and azure providers.

@@ -1648,7 +1648,7 @@ github.com/fluxcd/pkg/apis/meta.LocalObjectReference (Optional)

ProxySecretRef specifies the Secret containing the proxy configuration to use while communicating with the Bucket server.

-

Only supported for the generic and gcp providers.

+

Only supported for the generic, gcp and azure providers.

diff --git a/docs/spec/v1beta2/buckets.md b/docs/spec/v1beta2/buckets.md index cfe638744..df0c5eb80 100644 --- a/docs/spec/v1beta2/buckets.md +++ b/docs/spec/v1beta2/buckets.md @@ -854,7 +854,7 @@ The Secret can contain three keys: - `password`, to specify the password to use if the proxy server is protected by basic authentication. This is an optional key. -This API is only supported for the `generic` and `gcp` [providers](#provider). +This API is only supported for the `generic`, `gcp` and `azure` [providers](#provider). Example: diff --git a/internal/controller/bucket_controller.go b/internal/controller/bucket_controller.go index 8409b83a4..29dbd37c3 100644 --- a/internal/controller/bucket_controller.go +++ b/internal/controller/bucket_controller.go @@ -465,7 +465,14 @@ func (r *BucketReconciler) reconcileSource(ctx context.Context, sp *patch.Serial conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, "%s", e) return sreconcile.ResultEmpty, e } - if provider, err = azure.NewClient(obj, secret); err != nil { + var opts []azure.Option + if secret != nil { + opts = append(opts, azure.WithSecret(secret)) + } + if proxyURL != nil { + opts = append(opts, azure.WithProxyURL(proxyURL)) + } + if provider, err = azure.NewClient(obj, opts...); err != nil { e := serror.NewGeneric(err, "ClientError") conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, "%s", e) return sreconcile.ResultEmpty, e diff --git a/pkg/azure/blob.go b/pkg/azure/blob.go index 940f429b7..c95c9754d 100644 --- a/pkg/azure/blob.go +++ b/pkg/azure/blob.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" "io" + "net/http" "net/url" "os" "path/filepath" @@ -64,6 +65,48 @@ type BlobClient struct { *azblob.Client } +// Option configures the BlobClient. +type Option func(*options) + +// WithSecret sets the Secret to use for the BlobClient. +func WithSecret(secret *corev1.Secret) Option { + return func(o *options) { + o.secret = secret + } +} + +// WithProxyURL sets the proxy URL to use for the BlobClient. +func WithProxyURL(proxyURL *url.URL) Option { + return func(o *options) { + o.proxyURL = proxyURL + } +} + +type options struct { + secret *corev1.Secret + proxyURL *url.URL + withoutCredentials bool + withoutRetries bool +} + +// withoutCredentials forces the BlobClient to not use any credentials. +// This is a test-only option useful for testing the client with HTTP +// endpoints (without TLS) alongside all the other options unrelated to +// credentials. +func withoutCredentials() Option { + return func(o *options) { + o.withoutCredentials = true + } +} + +// withoutRetries sets the BlobClient to not retry requests. +// This is a test-only option useful for testing connection errors. +func withoutRetries() Option { + return func(o *options) { + o.withoutRetries = true + } +} + // NewClient creates a new Azure Blob storage client. // The credential config on the client is set based on the data from the // Bucket and Secret. It detects credentials in the Secret in the following @@ -87,56 +130,80 @@ type BlobClient struct { // // If no credentials are found, and the azidentity.ChainedTokenCredential can // not be established. A simple client without credentials is returned. -func NewClient(obj *sourcev1.Bucket, secret *corev1.Secret) (c *BlobClient, err error) { +func NewClient(obj *sourcev1.Bucket, opts ...Option) (c *BlobClient, err error) { c = &BlobClient{} + var o options + for _, opt := range opts { + opt(&o) + } + + clientOpts := &azblob.ClientOptions{} + + if o.proxyURL != nil { + transport := http.DefaultTransport.(*http.Transport).Clone() + transport.Proxy = http.ProxyURL(o.proxyURL) + clientOpts.ClientOptions.Transport = &http.Client{Transport: transport} + } + + if o.withoutRetries { + clientOpts.ClientOptions.Retry.ShouldRetry = func(resp *http.Response, err error) bool { + return false + } + } + + if o.withoutCredentials { + c.Client, err = azblob.NewClientWithNoCredential(obj.Spec.Endpoint, clientOpts) + return + } + var token azcore.TokenCredential - if secret != nil && len(secret.Data) > 0 { + if o.secret != nil && len(o.secret.Data) > 0 { // Attempt AAD Token Credential options first. - if token, err = tokenCredentialFromSecret(secret); err != nil { - err = fmt.Errorf("failed to create token credential from '%s' Secret: %w", secret.Name, err) + if token, err = tokenCredentialFromSecret(o.secret); err != nil { + err = fmt.Errorf("failed to create token credential from '%s' Secret: %w", o.secret.Name, err) return } if token != nil { - c.Client, err = azblob.NewClient(obj.Spec.Endpoint, token, nil) + c.Client, err = azblob.NewClient(obj.Spec.Endpoint, token, clientOpts) return } // Fallback to Shared Key Credential. var cred *azblob.SharedKeyCredential - if cred, err = sharedCredentialFromSecret(obj.Spec.Endpoint, secret); err != nil { + if cred, err = sharedCredentialFromSecret(obj.Spec.Endpoint, o.secret); err != nil { return } if cred != nil { - c.Client, err = azblob.NewClientWithSharedKeyCredential(obj.Spec.Endpoint, cred, &azblob.ClientOptions{}) + c.Client, err = azblob.NewClientWithSharedKeyCredential(obj.Spec.Endpoint, cred, clientOpts) return } var fullPath string - if fullPath, err = sasTokenFromSecret(obj.Spec.Endpoint, secret); err != nil { + if fullPath, err = sasTokenFromSecret(obj.Spec.Endpoint, o.secret); err != nil { return } - c.Client, err = azblob.NewClientWithNoCredential(fullPath, &azblob.ClientOptions{}) + c.Client, err = azblob.NewClientWithNoCredential(fullPath, clientOpts) return } // Compose token chain based on environment. // This functions as a replacement for azidentity.NewDefaultAzureCredential // to not shell out. - token, err = chainCredentialWithSecret(secret) + token, err = chainCredentialWithSecret(o.secret) if err != nil { err = fmt.Errorf("failed to create environment credential chain: %w", err) return nil, err } if token != nil { - c.Client, err = azblob.NewClient(obj.Spec.Endpoint, token, nil) + c.Client, err = azblob.NewClient(obj.Spec.Endpoint, token, clientOpts) return } // Fallback to simple client. - c.Client, err = azblob.NewClientWithNoCredential(obj.Spec.Endpoint, nil) + c.Client, err = azblob.NewClientWithNoCredential(obj.Spec.Endpoint, clientOpts) return } diff --git a/pkg/azure/blob_integration_test.go b/pkg/azure/blob_integration_test.go index d4adde309..1d1040adb 100644 --- a/pkg/azure/blob_integration_test.go +++ b/pkg/azure/blob_integration_test.go @@ -94,7 +94,7 @@ func TestMain(m *testing.M) { func TestBlobClient_BucketExists(t *testing.T) { g := NewWithT(t) - client, err := NewClient(testBucket.DeepCopy(), testSecret.DeepCopy()) + client, err := NewClient(testBucket.DeepCopy(), WithSecret(testSecret.DeepCopy())) g.Expect(err).ToNot(HaveOccurred()) g.Expect(client).ToNot(BeNil()) @@ -120,7 +120,7 @@ func TestBlobClient_BucketExists(t *testing.T) { func TestBlobClient_BucketNotExists(t *testing.T) { g := NewWithT(t) - client, err := NewClient(testBucket.DeepCopy(), testSecret.DeepCopy()) + client, err := NewClient(testBucket.DeepCopy(), WithSecret(testSecret.DeepCopy())) g.Expect(err).ToNot(HaveOccurred()) g.Expect(client).ToNot(BeNil()) @@ -140,7 +140,7 @@ func TestBlobClient_FGetObject(t *testing.T) { tempDir := t.TempDir() - client, err := NewClient(testBucket.DeepCopy(), testSecret.DeepCopy()) + client, err := NewClient(testBucket.DeepCopy(), WithSecret(testSecret.DeepCopy())) g.Expect(err).ToNot(HaveOccurred()) g.Expect(client).ToNot(BeNil()) @@ -180,7 +180,7 @@ func TestBlobClientSASKey_FGetObject(t *testing.T) { tempDir := t.TempDir() // create a client with the shared key - client, err := NewClient(testBucket.DeepCopy(), testSecret.DeepCopy()) + client, err := NewClient(testBucket.DeepCopy(), WithSecret(testSecret.DeepCopy())) g.Expect(err).ToNot(HaveOccurred()) g.Expect(client).ToNot(BeNil()) @@ -221,7 +221,7 @@ func TestBlobClientSASKey_FGetObject(t *testing.T) { }, } - sasKeyClient, err := NewClient(testBucket.DeepCopy(), testSASKeySecret.DeepCopy()) + sasKeyClient, err := NewClient(testBucket.DeepCopy(), WithSecret(testSASKeySecret.DeepCopy())) g.Expect(err).ToNot(HaveOccurred()) // Test if bucket and blob exists using sasKey. @@ -246,7 +246,7 @@ func TestBlobClientContainerSASKey_BucketExists(t *testing.T) { g := NewWithT(t) // create a client with the shared key - client, err := NewClient(testBucket.DeepCopy(), testSecret.DeepCopy()) + client, err := NewClient(testBucket.DeepCopy(), WithSecret(testSecret.DeepCopy())) g.Expect(err).ToNot(HaveOccurred()) g.Expect(client).ToNot(BeNil()) @@ -286,7 +286,7 @@ func TestBlobClientContainerSASKey_BucketExists(t *testing.T) { }, } - sasKeyClient, err := NewClient(testBucket.DeepCopy(), testSASKeySecret.DeepCopy()) + sasKeyClient, err := NewClient(testBucket.DeepCopy(), WithSecret(testSASKeySecret.DeepCopy())) g.Expect(err).ToNot(HaveOccurred()) ctx, timeout = context.WithTimeout(context.Background(), testTimeout) @@ -308,7 +308,7 @@ func TestBlobClientContainerSASKey_BucketExists(t *testing.T) { func TestBlobClient_FGetObject_NotFoundErr(t *testing.T) { g := NewWithT(t) - client, err := NewClient(testBucket.DeepCopy(), testSecret.DeepCopy()) + client, err := NewClient(testBucket.DeepCopy(), WithSecret(testSecret.DeepCopy())) g.Expect(err).ToNot(HaveOccurred()) g.Expect(client).ToNot(BeNil()) @@ -335,7 +335,7 @@ func TestBlobClient_FGetObject_NotFoundErr(t *testing.T) { func TestBlobClient_VisitObjects(t *testing.T) { g := NewWithT(t) - client, err := NewClient(testBucket.DeepCopy(), testSecret.DeepCopy()) + client, err := NewClient(testBucket.DeepCopy(), WithSecret(testSecret.DeepCopy())) g.Expect(err).ToNot(HaveOccurred()) g.Expect(client).ToNot(BeNil()) @@ -375,7 +375,7 @@ func TestBlobClient_VisitObjects(t *testing.T) { func TestBlobClient_VisitObjects_CallbackErr(t *testing.T) { g := NewWithT(t) - client, err := NewClient(testBucket.DeepCopy(), testSecret.DeepCopy()) + client, err := NewClient(testBucket.DeepCopy(), WithSecret(testSecret.DeepCopy())) g.Expect(err).ToNot(HaveOccurred()) g.Expect(client).ToNot(BeNil()) diff --git a/pkg/azure/blob_test.go b/pkg/azure/blob_test.go index 56a3ca0b9..240376f2b 100644 --- a/pkg/azure/blob_test.go +++ b/pkg/azure/blob_test.go @@ -18,6 +18,7 @@ package azure import ( "bytes" + "context" "crypto/rand" "crypto/rsa" "crypto/x509" @@ -25,6 +26,7 @@ import ( "errors" "fmt" "math/big" + "net/http" "net/url" "testing" @@ -34,8 +36,95 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror" . "github.com/onsi/gomega" corev1 "k8s.io/api/core/v1" + + sourcev1 "github.com/fluxcd/source-controller/api/v1beta2" + testlistener "github.com/fluxcd/source-controller/tests/listener" + testproxy "github.com/fluxcd/source-controller/tests/proxy" ) +func TestNewClientAndBucketExistsWithProxy(t *testing.T) { + g := NewWithT(t) + + proxyAddr, proxyPort := testproxy.New(t) + + // start mock bucket server + bucketListener, bucketAddr, _ := testlistener.New(t) + bucketEndpoint := fmt.Sprintf("http://%s", bucketAddr) + bucketHandler := http.NewServeMux() + bucketHandler.HandleFunc("GET /podinfo", func(w http.ResponseWriter, r *http.Request) { + // verify query params comp=list&maxresults=1&restype=container + q := r.URL.Query() + g.Expect(q.Get("comp")).To(Equal("list")) + g.Expect(q.Get("maxresults")).To(Equal("1")) + g.Expect(q.Get("restype")).To(Equal("container")) + // the azure library does not expose the struct for this response + // and copying its definition yields a strange "unsupported type" + // error when marshaling to xml, so we just hardcode a valid response + // here + resp := fmt.Sprintf(` + +1 + + +`, bucketEndpoint) + _, err := w.Write([]byte(resp)) + g.Expect(err).ToNot(HaveOccurred()) + }) + bucketServer := &http.Server{ + Addr: bucketAddr, + Handler: bucketHandler, + } + go bucketServer.Serve(bucketListener) + defer bucketServer.Shutdown(context.Background()) + + tests := []struct { + name string + endpoint string + proxyURL *url.URL + err string + }{ + { + name: "with correct proxy", + endpoint: bucketEndpoint, + proxyURL: &url.URL{Scheme: "http", Host: proxyAddr}, + }, + { + name: "with incorrect proxy", + endpoint: bucketEndpoint, + proxyURL: &url.URL{Scheme: "http", Host: fmt.Sprintf("localhost:%d", proxyPort+1)}, + err: "connection refused", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + + bucket := &sourcev1.Bucket{ + Spec: sourcev1.BucketSpec{ + Endpoint: tt.endpoint, + }, + } + + client, err := NewClient(bucket, + WithProxyURL(tt.proxyURL), + withoutCredentials(), + withoutRetries()) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(client).ToNot(BeNil()) + + ok, err := client.BucketExists(context.Background(), "podinfo") + if tt.err != "" { + g.Expect(err.Error()).To(ContainSubstring(tt.err)) + g.Expect(ok).To(BeFalse()) + } else { + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(ok).To(BeTrue()) + } + }) + } +} + func TestValidateSecret(t *testing.T) { tests := []struct { name string