Skip to content

Commit

Permalink
Merge pull request #1565 from matheuscscp/bucket-gcp-proxy
Browse files Browse the repository at this point in the history
Add proxy support for GCS buckets
matheuscscp authored Aug 9, 2024
2 parents c41c2d6 + 31ed900 commit 67f6cba
Showing 9 changed files with 256 additions and 33 deletions.
2 changes: 1 addition & 1 deletion api/v1beta2/bucket_types.go
Original file line number Diff line number Diff line change
@@ -113,7 +113,7 @@ type BucketSpec struct {
// ProxySecretRef specifies the Secret containing the proxy configuration
// to use while communicating with the Bucket server.
//
// Only supported for the generic provider.
// Only supported for the `generic` and `gcp` providers.
// +optional
ProxySecretRef *meta.LocalObjectReference `json:"proxySecretRef,omitempty"`

2 changes: 1 addition & 1 deletion config/crd/bases/source.toolkit.fluxcd.io_buckets.yaml
Original file line number Diff line number Diff line change
@@ -397,7 +397,7 @@ spec:
to use while communicating with the Bucket server.
Only supported for the generic provider.
Only supported for the `generic` and `gcp` providers.
properties:
name:
description: Name of the referent.
4 changes: 2 additions & 2 deletions docs/api/v1beta2/source.md
Original file line number Diff line number Diff line change
@@ -219,7 +219,7 @@ github.com/fluxcd/pkg/apis/meta.LocalObjectReference
<em>(Optional)</em>
<p>ProxySecretRef specifies the Secret containing the proxy configuration
to use while communicating with the Bucket server.</p>
<p>Only supported for the generic provider.</p>
<p>Only supported for the <code>generic</code> and <code>gcp</code> providers.</p>
</td>
</tr>
<tr>
@@ -1648,7 +1648,7 @@ github.com/fluxcd/pkg/apis/meta.LocalObjectReference
<em>(Optional)</em>
<p>ProxySecretRef specifies the Secret containing the proxy configuration
to use while communicating with the Bucket server.</p>
<p>Only supported for the generic provider.</p>
<p>Only supported for the <code>generic</code> and <code>gcp</code> providers.</p>
</td>
</tr>
<tr>
2 changes: 1 addition & 1 deletion docs/spec/v1beta2/buckets.md
Original file line number Diff line number Diff line change
@@ -854,7 +854,7 @@ The Secret can contain three keys:
- `password`, to specify the password to use if the proxy server is protected by
basic authentication. This is an optional key.

This API is only supported for the `generic` [provider](#provider).
This API is only supported for the `generic` and `gcp` [providers](#provider).

Example:

4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -9,6 +9,7 @@ replace github.com/fluxcd/source-controller/api => ./api
replace github.com/opencontainers/go-digest => github.com/opencontainers/go-digest v1.0.1-0.20220411205349-bde1400a84be

require (
cloud.google.com/go/compute/metadata v0.3.0
cloud.google.com/go/storage v1.39.1
github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.11.1
@@ -60,6 +61,7 @@ require (
github.com/sirupsen/logrus v1.9.3
github.com/spf13/pflag v1.0.5
golang.org/x/crypto v0.22.0
golang.org/x/oauth2 v0.19.0
golang.org/x/sync v0.7.0
google.golang.org/api v0.177.0
gotest.tools v2.2.0+incompatible
@@ -77,7 +79,6 @@ require (
cloud.google.com/go v0.112.2 // indirect
cloud.google.com/go/auth v0.3.0 // indirect
cloud.google.com/go/auth/oauth2adapt v0.2.2 // indirect
cloud.google.com/go/compute/metadata v0.3.0 // indirect
cloud.google.com/go/iam v1.1.6 // indirect
dario.cat/mergo v1.0.0 // indirect
filippo.io/edwards25519 v1.1.0 // indirect
@@ -360,7 +361,6 @@ require (
golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/net v0.24.0 // indirect
golang.org/x/oauth2 v0.19.0 // indirect
golang.org/x/sys v0.19.0 // indirect
golang.org/x/term v0.19.0 // indirect
golang.org/x/text v0.14.0 // indirect
21 changes: 14 additions & 7 deletions internal/controller/bucket_controller.go
Original file line number Diff line number Diff line change
@@ -431,6 +431,12 @@ func (r *BucketReconciler) reconcileSource(ctx context.Context, sp *patch.Serial
// Return error as the world as observed may change
return sreconcile.ResultEmpty, e
}
proxyURL, err := r.getProxyURL(ctx, obj)
if err != nil {
e := serror.NewGeneric(err, sourcev1.AuthenticationFailedReason)
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, "%s", e)
return sreconcile.ResultEmpty, e
}

// Construct provider client
var provider BucketProvider
@@ -441,7 +447,14 @@ func (r *BucketReconciler) reconcileSource(ctx context.Context, sp *patch.Serial
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, "%s", e)
return sreconcile.ResultEmpty, e
}
if provider, err = gcp.NewClient(ctx, secret); err != nil {
var opts []gcp.Option
if secret != nil {
opts = append(opts, gcp.WithSecret(secret))
}
if proxyURL != nil {
opts = append(opts, gcp.WithProxyURL(proxyURL))
}
if provider, err = gcp.NewClient(ctx, opts...); err != nil {
e := serror.NewGeneric(err, "ClientError")
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, "%s", e)
return sreconcile.ResultEmpty, e
@@ -482,12 +495,6 @@ func (r *BucketReconciler) reconcileSource(ctx context.Context, sp *patch.Serial
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, "%s", e)
return sreconcile.ResultEmpty, e
}
proxyURL, err := r.getProxyURL(ctx, obj)
if err != nil {
e := serror.NewGeneric(err, sourcev1.AuthenticationFailedReason)
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error())
return sreconcile.ResultEmpty, e
}
var opts []minio.Option
if secret != nil {
opts = append(opts, minio.WithSecret(secret))
51 changes: 49 additions & 2 deletions internal/controller/bucket_controller_test.go
Original file line number Diff line number Diff line change
@@ -445,7 +445,7 @@ func TestBucketReconciler_reconcileSource_generic(t *testing.T) {
assertConditions []metav1.Condition
}{
{
name: "Reconciles GCS source",
name: "Reconciles generic source",
bucketName: "dummy",
bucketObjects: []*s3mock.Object{
{
@@ -972,6 +972,49 @@ func TestBucketReconciler_reconcileSource_gcs(t *testing.T) {
*conditions.UnknownCondition(meta.ReadyCondition, "foo", "bar"),
},
},
{
name: "Observes non-existing proxySecretRef",
bucketName: "dummy",
beforeFunc: func(obj *bucketv1.Bucket) {
obj.Spec.ProxySecretRef = &meta.LocalObjectReference{
Name: "dummy",
}
conditions.MarkReconciling(obj, meta.ProgressingReason, "foo")
conditions.MarkUnknown(obj, meta.ReadyCondition, "foo", "bar")
},
want: sreconcile.ResultEmpty,
wantErr: true,
assertIndex: index.NewDigester(),
assertConditions: []metav1.Condition{
*conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.AuthenticationFailedReason, "failed to get secret '/dummy': secrets \"dummy\" not found"),
*conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "foo"),
*conditions.UnknownCondition(meta.ReadyCondition, "foo", "bar"),
},
},
{
name: "Observes invalid proxySecretRef",
bucketName: "dummy",
secret: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "dummy",
},
},
beforeFunc: func(obj *bucketv1.Bucket) {
obj.Spec.ProxySecretRef = &meta.LocalObjectReference{
Name: "dummy",
}
conditions.MarkReconciling(obj, meta.ProgressingReason, "foo")
conditions.MarkUnknown(obj, meta.ReadyCondition, "foo", "bar")
},
want: sreconcile.ResultEmpty,
wantErr: true,
assertIndex: index.NewDigester(),
assertConditions: []metav1.Condition{
*conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.AuthenticationFailedReason, "invalid proxy secret '/dummy': key 'address' is missing"),
*conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "foo"),
*conditions.UnknownCondition(meta.ReadyCondition, "foo", "bar"),
},
},
{
name: "Observes non-existing bucket name",
bucketName: "dummy",
@@ -1217,7 +1260,11 @@ func TestBucketReconciler_reconcileSource_gcs(t *testing.T) {
sp := patch.NewSerialPatcher(obj, r.Client)

got, err := r.reconcileSource(context.TODO(), sp, obj, index, tmpDir)
g.Expect(err != nil).To(Equal(tt.wantErr))
if tt.wantErr {
g.Expect(err).To(HaveOccurred())
} else {
g.Expect(err).ToNot(HaveOccurred())
}
g.Expect(got).To(Equal(tt.want))

g.Expect(index.Index()).To(Equal(tt.assertIndex.Index()))
98 changes: 87 additions & 11 deletions pkg/gcp/gcp.go
Original file line number Diff line number Diff line change
@@ -21,13 +21,17 @@ import (
"errors"
"fmt"
"io"
"net/http"
"net/url"
"os"
"path/filepath"

gcpstorage "cloud.google.com/go/storage"
"github.com/go-logr/logr"
"golang.org/x/oauth2/google"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
htransport "google.golang.org/api/transport/http"
corev1 "k8s.io/api/core/v1"
ctrl "sigs.k8s.io/controller-runtime"
)
@@ -48,24 +52,96 @@ type GCSClient struct {
*gcpstorage.Client
}

// NewClient creates a new GCP storage client. The Client will automatically look for the Google Application
// Option is a functional option for configuring the GCS client.
type Option func(*options)

// WithSecret sets the secret to use for authenticating with GCP.
func WithSecret(secret *corev1.Secret) Option {
return func(o *options) {
o.secret = secret
}
}

// WithProxyURL sets the proxy URL to use for the GCS client.
func WithProxyURL(proxyURL *url.URL) Option {
return func(o *options) {
o.proxyURL = proxyURL
}
}

type options struct {
secret *corev1.Secret
proxyURL *url.URL

// newCustomHTTPClient should create a new HTTP client for interacting with the GCS API.
// This is a test-only option required for mocking the real logic, which requires either
// a valid Google Service Account Key or ADC. Both are not available in tests.
// The real logic is implemented in the newHTTPClient function, which is used when
// constructing the default options object.
newCustomHTTPClient func(context.Context, *options) (*http.Client, error)
}

func newOptions() *options {
return &options{
newCustomHTTPClient: newHTTPClient,
}
}

// NewClient creates a new GCP storage client. The Client will automatically look for the Google Application
// Credential environment variable or look for the Google Application Credential file.
func NewClient(ctx context.Context, secret *corev1.Secret) (*GCSClient, error) {
c := &GCSClient{}
if secret != nil {
client, err := gcpstorage.NewClient(ctx, option.WithCredentialsJSON(secret.Data["serviceaccount"]))
func NewClient(ctx context.Context, opts ...Option) (*GCSClient, error) {
o := newOptions()
for _, opt := range opts {
opt(o)
}

var clientOpts []option.ClientOption

switch {
case o.secret != nil && o.proxyURL == nil:
clientOpts = append(clientOpts, option.WithCredentialsJSON(o.secret.Data["serviceaccount"]))
case o.proxyURL != nil:
httpClient, err := o.newCustomHTTPClient(ctx, o)
if err != nil {
return nil, err
}
c.Client = client
} else {
client, err := gcpstorage.NewClient(ctx)
clientOpts = append(clientOpts, option.WithHTTPClient(httpClient))
}

client, err := gcpstorage.NewClient(ctx, clientOpts...)
if err != nil {
return nil, err
}

return &GCSClient{client}, nil
}

// newHTTPClient creates a new HTTP client for interacting with Google Cloud APIs.
func newHTTPClient(ctx context.Context, o *options) (*http.Client, error) {
baseTransport := http.DefaultTransport.(*http.Transport).Clone()
if o.proxyURL != nil {
baseTransport.Proxy = http.ProxyURL(o.proxyURL)
}

var opts []option.ClientOption

if o.secret != nil {
// Here we can't use option.WithCredentialsJSON() because htransport.NewTransport()
// won't know what scopes to use and yield a 400 Bad Request error when retrieving
// the OAuth token. Instead we use google.CredentialsFromJSON(), which allows us to
// specify the GCS read-only scope.
creds, err := google.CredentialsFromJSON(ctx, o.secret.Data["serviceaccount"], gcpstorage.ScopeReadOnly)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to create Google credentials from secret: %w", err)
}
c.Client = client
opts = append(opts, option.WithCredentials(creds))
}

transport, err := htransport.NewTransport(ctx, baseTransport, opts...)
if err != nil {
return nil, fmt.Errorf("failed to create Google HTTP transport: %w", err)
}
return c, nil
return &http.Client{Transport: transport}, nil
}

// ValidateSecret validates the credential secret. The provided Secret may
105 changes: 99 additions & 6 deletions pkg/gcp/gcp_test.go
Original file line number Diff line number Diff line change
@@ -26,30 +26,36 @@ import (
"net"
"net/http"
"net/http/httptest"
"net/url"
"os"
"path/filepath"
"testing"
"time"

"cloud.google.com/go/compute/metadata"
gcpstorage "cloud.google.com/go/storage"
"google.golang.org/api/googleapi"
"google.golang.org/api/option"
raw "google.golang.org/api/storage/v1"
"gotest.tools/assert"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"google.golang.org/api/option"
testproxy "github.com/fluxcd/source-controller/tests/proxy"
)

const (
bucketName string = "test-bucket"
objectName string = "test.yaml"
objectGeneration int64 = 3
objectEtag string = "bFbHCDvedeecefdgmfmhfuRxBdcedGe96S82XJOAXxjJpk="
envGCSHost string = "STORAGE_EMULATOR_HOST"
envADC string = "GOOGLE_APPLICATION_CREDENTIALS"
)

var (
hc *http.Client
host string
client *gcpstorage.Client
close func()
err error
@@ -76,7 +82,7 @@ var (
)

func TestMain(m *testing.M) {
hc, close = newTestServer(func(w http.ResponseWriter, r *http.Request) {
hc, host, close = newTestServer(func(w http.ResponseWriter, r *http.Request) {
io.Copy(io.Discard, r.Body)
switch r.RequestURI {
case fmt.Sprintf("/storage/v1/b/%s?alt=json&prettyPrint=false&projection=full", bucketName):
@@ -140,12 +146,98 @@ func TestMain(m *testing.M) {
}

func TestNewClientWithSecretErr(t *testing.T) {
gcpClient, err := NewClient(context.Background(), secret.DeepCopy())
gcpClient, err := NewClient(context.Background(), WithSecret(secret.DeepCopy()))
t.Log(err)
assert.Error(t, err, "dialing: invalid character 'e' looking for beginning of value")
assert.Assert(t, gcpClient == nil)
}

func TestNewClientWithProxyErr(t *testing.T) {
_, envADCIsSet := os.LookupEnv(envADC)
assert.Assert(t, !envADCIsSet)
assert.Assert(t, !metadata.OnGCE())

tests := []struct {
name string
opts []Option
err string
}{
{
name: "invalid secret",
opts: []Option{WithSecret(secret.DeepCopy())},
err: "failed to create Google credentials from secret: invalid character 'e' looking for beginning of value",
},
{
name: "attempts default credentials",
err: "failed to create Google HTTP transport: google: could not find default credentials. See https://cloud.google.com/docs/authentication/external/set-up-adc for more information",
},
}

for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
opts := append([]Option{WithProxyURL(&url.URL{})}, tt.opts...)
gcpClient, err := NewClient(context.Background(), opts...)
assert.Error(t, err, tt.err)
assert.Assert(t, gcpClient == nil)
})
}
}

func TestProxy(t *testing.T) {
proxyAddr, proxyPort := testproxy.New(t)

err := os.Setenv(envGCSHost, fmt.Sprintf("https://%s", host))
assert.NilError(t, err)
defer func() {
err := os.Unsetenv(envGCSHost)
assert.NilError(t, err)
}()

tests := []struct {
name string
proxyURL *url.URL
err string
}{
{
name: "with correct address",
proxyURL: &url.URL{Scheme: "http", Host: proxyAddr},
},
{
name: "with incorrect address",
proxyURL: &url.URL{Scheme: "http", Host: fmt.Sprintf("localhost:%d", proxyPort+1)},
err: "connection refused",
},
}

for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
opts := []Option{WithProxyURL(tt.proxyURL)}
opts = append(opts, func(o *options) {
o.newCustomHTTPClient = func(ctx context.Context, o *options) (*http.Client, error) {
transport := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
Proxy: http.ProxyURL(o.proxyURL),
}
return &http.Client{Transport: transport}, nil
}
})
gcpClient, err := NewClient(context.Background(), opts...)
assert.NilError(t, err)
assert.Assert(t, gcpClient != nil)
gcpClient.Client.SetRetry(gcpstorage.WithMaxAttempts(1))
exists, err := gcpClient.BucketExists(context.Background(), bucketName)
if tt.err != "" {
assert.ErrorContains(t, err, tt.err)
} else {
assert.NilError(t, err)
assert.Assert(t, exists)
}
})
}
}

func TestBucketExists(t *testing.T) {
gcpClient := &GCSClient{
Client: client,
@@ -272,16 +364,17 @@ func TestValidateSecret(t *testing.T) {
}
}

func newTestServer(handler func(w http.ResponseWriter, r *http.Request)) (*http.Client, func()) {
func newTestServer(handler func(w http.ResponseWriter, r *http.Request)) (*http.Client, string, func()) {
ts := httptest.NewTLSServer(http.HandlerFunc(handler))
host := ts.Listener.Addr().String()
tlsConf := &tls.Config{InsecureSkipVerify: true}
tr := &http.Transport{
TLSClientConfig: tlsConf,
DialTLS: func(netw, addr string) (net.Conn, error) {
return tls.Dial("tcp", ts.Listener.Addr().String(), tlsConf)
return tls.Dial("tcp", host, tlsConf)
},
}
return &http.Client{Transport: tr}, func() {
return &http.Client{Transport: tr}, host, func() {
tr.CloseIdleConnections()
ts.Close()
}

0 comments on commit 67f6cba

Please sign in to comment.