diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 87739942a..f27b80e8d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -21,7 +21,7 @@ jobs: go-version-file: 'go.mod' cache: true - name: Lint code - uses: golangci/golangci-lint-action@v5 + uses: golangci/golangci-lint-action@v6 with: version: ${{ env.GOLANGCI_LINT_VERSION }} skip-pkg-cache: true diff --git a/.github/workflows/goreleaser.yml b/.github/workflows/goreleaser.yml index c0487cd4d..fa01eabb6 100644 --- a/.github/workflows/goreleaser.yml +++ b/.github/workflows/goreleaser.yml @@ -19,7 +19,7 @@ jobs: id: date run: echo "::set-output name=date::$(date -u '+%Y-%m-%d-%H:%M:%S-%Z')" - name: goreleaser - uses: goreleaser/goreleaser-action@v5 + uses: goreleaser/goreleaser-action@v6 with: version: latest args: release --rm-dist diff --git a/Makefile b/Makefile index 01a57d01c..ba3f64b88 100644 --- a/Makefile +++ b/Makefile @@ -74,7 +74,7 @@ test-unit-docker: ## run unit tests with docker .PHONY: test-e2e test-e2e: - cd e2etests && go test --tags e2etests + ./scripts/test_e2e.sh .PHONY: test-e2e-docker test-e2e-docker: diff --git a/cmd/proxy/actions/app.go b/cmd/proxy/actions/app.go index 74df44abf..acc686710 100644 --- a/cmd/proxy/actions/app.go +++ b/cmd/proxy/actions/app.go @@ -52,7 +52,6 @@ func App(logger *log.Logger, conf *config.Config) (http.Handler, error) { SSLRedirect: conf.ForceSSL, SSLProxyHeaders: map[string]string{"X-Forwarded-Proto": "https"}, }).Handler, - mw.ContentType, ) var subRouter *mux.Router diff --git a/cmd/proxy/actions/app_proxy.go b/cmd/proxy/actions/app_proxy.go index 777ad4ca6..0117bc9db 100644 --- a/cmd/proxy/actions/app_proxy.go +++ b/cmd/proxy/actions/app_proxy.go @@ -101,7 +101,7 @@ func addProxyRoutes( lister := module.NewVCSLister(c.GoBinary, c.GoBinaryEnvVars, fs) checker := storage.WithChecker(s) - withSingleFlight, err := getSingleFlight(l, c, checker) + withSingleFlight, err := getSingleFlight(l, c, s, checker) if err != nil { return err } @@ -137,7 +137,7 @@ func (l *athensLoggerForRedis) Printf(ctx context.Context, format string, v ...a l.logger.WithContext(ctx).Printf(format, v...) } -func getSingleFlight(l *log.Logger, c *config.Config, checker storage.Checker) (stash.Wrapper, error) { +func getSingleFlight(l *log.Logger, c *config.Config, s storage.Backend, checker storage.Checker) (stash.Wrapper, error) { switch c.SingleFlightType { case "", "memory": return stash.WithSingleflight, nil @@ -173,7 +173,7 @@ func getSingleFlight(l *log.Logger, c *config.Config, checker storage.Checker) ( if c.StorageType != "gcp" { return nil, fmt.Errorf("gcp SingleFlight only works with a gcp storage type and not: %v", c.StorageType) } - return stash.WithGCSLock, nil + return stash.WithGCSLock(c.SingleFlight.GCP.StaleThreshold, s) case "azureblob": if c.StorageType != "azureblob" { return nil, fmt.Errorf("azureblob SingleFlight only works with a azureblob storage type and not: %v", c.StorageType) diff --git a/cmd/proxy/actions/catalog.go b/cmd/proxy/actions/catalog.go index 769208f1e..96847b7c6 100644 --- a/cmd/proxy/actions/catalog.go +++ b/cmd/proxy/actions/catalog.go @@ -23,6 +23,7 @@ func catalogHandler(s storage.Backend) http.HandlerFunc { const op errors.Op = "actions.CatalogHandler" cs, isCataloger := s.(storage.Cataloger) f := func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json; charset=utf-8") if !isCataloger { w.WriteHeader(errors.KindNotImplemented) return diff --git a/cmd/proxy/actions/health.go b/cmd/proxy/actions/health.go index 8c34b33ca..53fc5f25e 100644 --- a/cmd/proxy/actions/health.go +++ b/cmd/proxy/actions/health.go @@ -5,5 +5,6 @@ import ( ) func healthHandler(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) } diff --git a/cmd/proxy/actions/home.go b/cmd/proxy/actions/home.go index 8f41b979e..f73815444 100644 --- a/cmd/proxy/actions/home.go +++ b/cmd/proxy/actions/home.go @@ -125,7 +125,7 @@ func proxyHomeHandler(c *config.Config) http.HandlerFunc { w.WriteHeader(http.StatusInternalServerError) } - w.Header().Add("Content-Type", "text/html") + w.Header().Set("Content-Type", "text/html") w.WriteHeader(http.StatusOK) err = tmp.ExecuteTemplate(w, "home", templateData) diff --git a/cmd/proxy/actions/index.go b/cmd/proxy/actions/index.go index e747a7abb..1e4744e8d 100644 --- a/cmd/proxy/actions/index.go +++ b/cmd/proxy/actions/index.go @@ -23,6 +23,8 @@ func indexHandler(index index.Indexer) http.HandlerFunc { http.Error(w, err.Error(), errors.Kind(err)) return } + + w.Header().Set("Content-Type", "application/json; charset=utf-8") enc := json.NewEncoder(w) for _, meta := range list { if err = enc.Encode(meta); err != nil { diff --git a/cmd/proxy/actions/readiness.go b/cmd/proxy/actions/readiness.go index 48a900d53..515a603bf 100644 --- a/cmd/proxy/actions/readiness.go +++ b/cmd/proxy/actions/readiness.go @@ -9,6 +9,7 @@ import ( func getReadinessHandler(s storage.Backend) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { if _, err := s.List(r.Context(), "github.com/gomods/athens"); err != nil { + w.Header().Set("Content-Type", "application/json; charset=utf-8") w.WriteHeader(http.StatusInternalServerError) } } diff --git a/cmd/proxy/actions/robots.go b/cmd/proxy/actions/robots.go index b7edb1358..67ad8e3cf 100644 --- a/cmd/proxy/actions/robots.go +++ b/cmd/proxy/actions/robots.go @@ -9,6 +9,7 @@ import ( // robotsHandler implements GET baseURL/robots.txt. func robotsHandler(c *config.Config) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/plain; charset=utf-8") http.ServeFile(w, r, c.RobotsFile) } } diff --git a/cmd/proxy/actions/version.go b/cmd/proxy/actions/version.go index a0cba0120..7ca732203 100644 --- a/cmd/proxy/actions/version.go +++ b/cmd/proxy/actions/version.go @@ -9,4 +9,6 @@ import ( func versionHandler(w http.ResponseWriter, r *http.Request) { _ = json.NewEncoder(w).Encode(build.Data()) + w.Header().Set("Content-Type", "application/json; charset=utf-8") + w.WriteHeader(http.StatusOK) } diff --git a/config.dev.toml b/config.dev.toml index d04df2fea..aab468dfc 100755 --- a/config.dev.toml +++ b/config.dev.toml @@ -377,6 +377,10 @@ ShutdownTimeout = 60 # Max retries while acquiring the lock. Defaults to 10. # Env override: ATHENS_REDIS_LOCK_MAX_RETRIES MaxRetries = 10 + [SingleFlight.GCP] + # Threshold for how long to wait in seconds for an in-progress GCP upload to + # be considered to have failed to unlock. + StaleThreshold = 120 [Storage] # Only storage backends that are specified in Proxy.StorageType are required here [Storage.CDN] diff --git a/docs/content/configuration/storage.md b/docs/content/configuration/storage.md index ab63ea653..81ef957d2 100644 --- a/docs/content/configuration/storage.md +++ b/docs/content/configuration/storage.md @@ -492,3 +492,14 @@ Optionally, like `redis`, you can also specify a password to connect to the `red SentinelPassword = "sekret" Distributed lock options can be customised for redis sentinal as well, in a similar manner as described above for redis. + + +### Using GCP as a singleflight mechanism + +The GCP singleflight mechanism does not required configuration, and works out of the box. It has a +single option with which it can be customized: + + [SingleFlight.GCP] + # Threshold for how long to wait in seconds for an in-progress GCP upload to + # be considered to have failed to unlock. + StaleThreshold = 120 diff --git a/pkg/config/config.go b/pkg/config/config.go index 9caed7fee..b591b949a 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -181,6 +181,7 @@ func defaultConfig() *Config { SentinelPassword: "sekret", LockConfig: DefaultRedisLockConfig(), }, + GCP: DefaultGCPConfig(), }, Index: &Index{ MySQL: &MySQL{ diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 6c741a04a..b57445e7d 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -255,6 +255,7 @@ func TestParseExampleConfig(t *testing.T) { LockConfig: DefaultRedisLockConfig(), }, Etcd: &Etcd{Endpoints: "localhost:2379,localhost:22379,localhost:32379"}, + GCP: DefaultGCPConfig(), } expConf := &Config{ @@ -391,6 +392,8 @@ func getEnvMap(config *Config) map[string]string { } else if singleFlight.Etcd != nil { envVars["ATHENS_SINGLE_FLIGHT_TYPE"] = "etcd" envVars["ATHENS_ETCD_ENDPOINTS"] = singleFlight.Etcd.Endpoints + } else if singleFlight.GCP != nil { + envVars["ATHENS_GCP_STALE_THRESHOLD"] = strconv.Itoa(singleFlight.GCP.StaleThreshold) } } return envVars diff --git a/pkg/config/singleflight.go b/pkg/config/singleflight.go index 69049b10a..b3b6fe048 100644 --- a/pkg/config/singleflight.go +++ b/pkg/config/singleflight.go @@ -7,6 +7,7 @@ type SingleFlight struct { Etcd *Etcd Redis *Redis RedisSentinel *RedisSentinel + GCP *GCP } // Etcd holds client side configuration @@ -48,3 +49,15 @@ func DefaultRedisLockConfig() *RedisLockConfig { MaxRetries: 10, } } + +// GCP is the configuration for GCP locking. +type GCP struct { + StaleThreshold int `envconfig:"ATHENS_GCP_STALE_THRESHOLD"` +} + +// DefaultGCPConfig returns the default GCP locking configuration. +func DefaultGCPConfig() *GCP { + return &GCP{ + StaleThreshold: 120, + } +} diff --git a/pkg/download/latest.go b/pkg/download/latest.go index 6e5aad194..357f00054 100644 --- a/pkg/download/latest.go +++ b/pkg/download/latest.go @@ -17,6 +17,7 @@ const PathLatest = "/{module:.+}/@latest" func LatestHandler(dp Protocol, lggr log.Entry, df *mode.DownloadFile) http.Handler { const op errors.Op = "download.LatestHandler" f := func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/plain; charset=utf-8") mod, err := paths.GetModule(r) if err != nil { lggr.SystemErr(errors.E(op, err)) diff --git a/pkg/download/list.go b/pkg/download/list.go index 405fbca47..dd94e8c92 100644 --- a/pkg/download/list.go +++ b/pkg/download/list.go @@ -18,6 +18,7 @@ const PathList = "/{module:.+}/@v/list" func ListHandler(dp Protocol, lggr log.Entry, df *mode.DownloadFile) http.Handler { const op errors.Op = "download.ListHandler" f := func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json; charset=utf-8") mod, err := paths.GetModule(r) if err != nil { lggr.SystemErr(errors.E(op, err)) diff --git a/pkg/download/version_info.go b/pkg/download/version_info.go index 3a6c96035..d68adb523 100644 --- a/pkg/download/version_info.go +++ b/pkg/download/version_info.go @@ -15,6 +15,7 @@ const PathVersionInfo = "/{module:.+}/@v/{version}.info" func InfoHandler(dp Protocol, lggr log.Entry, df *mode.DownloadFile) http.Handler { const op errors.Op = "download.InfoHandler" f := func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json; charset=utf-8") mod, ver, err := getModuleParams(r, op) if err != nil { lggr.SystemErr(err) diff --git a/pkg/download/version_module.go b/pkg/download/version_module.go index 1a1002308..08b423da7 100644 --- a/pkg/download/version_module.go +++ b/pkg/download/version_module.go @@ -15,6 +15,7 @@ const PathVersionModule = "/{module:.+}/@v/{version}.mod" func ModuleHandler(dp Protocol, lggr log.Entry, df *mode.DownloadFile) http.Handler { const op errors.Op = "download.VersionModuleHandler" f := func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/plain; charset=utf-8") mod, ver, err := getModuleParams(r, op) if err != nil { err = errors.E(op, errors.M(mod), errors.V(ver), err) diff --git a/pkg/stash/with_gcs.go b/pkg/stash/with_gcs.go index 3e2386e5c..788251f1c 100644 --- a/pkg/stash/with_gcs.go +++ b/pkg/stash/with_gcs.go @@ -2,15 +2,32 @@ package stash import ( "context" + "fmt" + "time" "github.com/gomods/athens/pkg/errors" "github.com/gomods/athens/pkg/observ" + "github.com/gomods/athens/pkg/storage" + "github.com/gomods/athens/pkg/storage/gcp" ) // WithGCSLock returns a distributed singleflight // using a GCS backend. See the config.toml documentation for details. -func WithGCSLock(s Stasher) Stasher { - return &gcsLock{s} +func WithGCSLock(staleThreshold int, s storage.Backend) (Wrapper, error) { + if staleThreshold <= 0 { + return nil, errors.E("stash.WithGCSLock", fmt.Errorf("invalid stale threshold")) + } + // Since we *must* be using a GCP stoagfe backend, we can abuse this + // fact to mutate it, so that we can get our threshold into Save(). + // Your instincts are correct, this is kind of gross. + gs, ok := s.(*gcp.Storage) + if !ok { + return nil, errors.E("stash.WithGCSLock", fmt.Errorf("GCP singleflight can only be used with GCP storage")) + } + gs.SetStaleThreshold(time.Duration(staleThreshold) * time.Second) + return func(s Stasher) Stasher { + return &gcsLock{s} + }, nil } type gcsLock struct { diff --git a/pkg/stash/with_gcs_test.go b/pkg/stash/with_gcs_test.go index 3a309cac0..d738a26d0 100644 --- a/pkg/stash/with_gcs_test.go +++ b/pkg/stash/with_gcs_test.go @@ -3,6 +3,7 @@ package stash import ( "bytes" "context" + "fmt" "io" "os" "strings" @@ -17,6 +18,12 @@ import ( "golang.org/x/sync/errgroup" ) +type failReader int + +func (f *failReader) Read([]byte) (int, error) { + return 0, fmt.Errorf("failure") +} + // TestWithGCS requires a real GCP backend implementation // and it will ensure that saving to modules at the same time // is done synchronously so that only the first module gets saved. @@ -41,7 +48,11 @@ func TestWithGCS(t *testing.T) { for i := 0; i < 5; i++ { content := uuid.New().String() ms := &mockGCPStasher{strg, content} - s := WithGCSLock(ms) + gs, err := WithGCSLock(120, strg) + if err != nil { + t.Fatal(err) + } + s := gs(ms) eg.Go(func() error { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() @@ -79,6 +90,72 @@ func TestWithGCS(t *testing.T) { } } +// TestWithGCSPartialFailure equires a real GCP backend implementation +// and ensures that if one of the non-singleflight-lock files fails to +// upload, that the cache does not remain poisoned. +func TestWithGCSPartialFailure(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5) + defer cancel() + const ( + mod = "stashmod" + ver = "v1.0.0" + ) + strg := getStorage(t) + strg.Delete(ctx, mod, ver) + defer strg.Delete(ctx, mod, ver) + + // sanity check + _, err := strg.GoMod(ctx, mod, ver) + if !errors.Is(err, errors.KindNotFound) { + t.Fatalf("expected the stash bucket to return a NotFound error but got: %v", err) + } + + content := uuid.New().String() + ms := &mockGCPStasher{strg, content} + fr := new(failReader) + gs, err := WithGCSLock(120, strg) + if err != nil { + t.Fatal(err) + } + s := gs(ms) + // We simulate a failure by manually passing an io.Reader that will fail. + err = ms.strg.Save(ctx, "stashmod", "v1.0.0", []byte(ms.content), fr, []byte(ms.content)) + if err == nil { + // We *want* to fail. + t.Fatal(err) + } + + // Now try a Stash. This should upload the missing files. + _, err = s.Stash(ctx, "stashmod", "v1.0.0") + if err != nil { + t.Fatal(err) + } + + info, err := strg.Info(ctx, mod, ver) + if err != nil { + t.Fatal(err) + } + modContent, err := strg.GoMod(ctx, mod, ver) + if err != nil { + t.Fatal(err) + } + zip, err := strg.Zip(ctx, mod, ver) + if err != nil { + t.Fatal(err) + } + defer zip.Close() + zipContent, err := io.ReadAll(zip) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(info, modContent) { + t.Fatalf("expected info and go.mod to be equal but info was {%v} and content was {%v}", string(info), string(modContent)) + } + if !bytes.Equal(info, zipContent) { + t.Fatalf("expected info and zip to be equal but info was {%v} and content was {%v}", string(info), string(zipContent)) + } +} + // mockGCPStasher is like mockStasher // but leverages in memory storage // so that redis can determine diff --git a/pkg/storage/gcp/gcp.go b/pkg/storage/gcp/gcp.go index a15d25d1d..b502fe588 100644 --- a/pkg/storage/gcp/gcp.go +++ b/pkg/storage/gcp/gcp.go @@ -15,8 +15,9 @@ import ( // Storage implements the (./pkg/storage).Backend interface. type Storage struct { - bucket *storage.BucketHandle - timeout time.Duration + bucket *storage.BucketHandle + timeout time.Duration + staleThreshold time.Duration } // New returns a new Storage instance backed by a Google Cloud Storage bucket. diff --git a/pkg/storage/gcp/saver.go b/pkg/storage/gcp/saver.go index 4298aaa17..b430d64d7 100644 --- a/pkg/storage/gcp/saver.go +++ b/pkg/storage/gcp/saver.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "io" + "time" "cloud.google.com/go/storage" "github.com/gomods/athens/pkg/config" @@ -12,6 +13,10 @@ import ( googleapi "google.golang.org/api/googleapi" ) +// Fallback for how long we consider an "in_progress" metadata key stale, +// due to failure to remove it. +const fallbackInProgressStaleThreshold = 2 * time.Minute + // Save uploads the module's .mod, .zip and .info files for a given version // It expects a context, which can be provided using context.Background // from the standard library until context has been threaded down the stack. @@ -20,40 +25,146 @@ import ( // Uploaded files are publicly accessible in the storage bucket as per // an ACL rule. func (s *Storage) Save(ctx context.Context, module, version string, mod []byte, zip io.Reader, info []byte) error { - const op errors.Op = "gcp.Save" + const op errors.Op = "gcp.save" ctx, span := observ.StartSpan(ctx, op.String()) defer span.End() gomodPath := config.PackageVersionedName(module, version, "mod") - err := s.upload(ctx, gomodPath, bytes.NewReader(mod)) - if err != nil { + innerErr := s.save(ctx, module, version, mod, zip, info) + if errors.Is(innerErr, errors.KindAlreadyExists) { + // Cache hit. + return errors.E(op, innerErr) + } + // No cache hit. Remove the metadata lock if it is there. + inProgress, outerErr := s.checkUploadInProgress(ctx, gomodPath) + if outerErr != nil { + return errors.E(op, outerErr) + } + if inProgress { + outerErr = s.removeInProgressMetadata(ctx, gomodPath) + if outerErr != nil { + return errors.E(op, outerErr) + } + } + return innerErr +} + +// SetStaleThreshold sets the threshold of how long we consider +// a lock metadata stale after. +func (s *Storage) SetStaleThreshold(threshold time.Duration) { + s.staleThreshold = threshold +} + +func (s *Storage) save(ctx context.Context, module, version string, mod []byte, zip io.Reader, info []byte) error { + const op errors.Op = "gcp.save" + ctx, span := observ.StartSpan(ctx, op.String()) + defer span.End() + gomodPath := config.PackageVersionedName(module, version, "mod") + seenAlreadyExists := 0 + err := s.upload(ctx, gomodPath, bytes.NewReader(mod), true) + // If it already exists, check the object metadata to see if the + // other two are still uploading in progress somewhere else. If they + // are, return a cache hit. If not, continue on to the other two, + // and only return a cache hit if all three exist. + if errors.Is(err, errors.KindAlreadyExists) { + inProgress, progressErr := s.checkUploadInProgress(ctx, gomodPath) + if progressErr != nil { + return errors.E(op, progressErr) + } + if inProgress { + // err is known to be errors.KindAlreadyExists at this point, so + // this is a cache hit return. + return errors.E(op, err) + } + seenAlreadyExists++ + } else if err != nil { + // Other errors return errors.E(op, err) } zipPath := config.PackageVersionedName(module, version, "zip") - err = s.upload(ctx, zipPath, zip) - if err != nil { + err = s.upload(ctx, zipPath, zip, false) + if errors.Is(err, errors.KindAlreadyExists) { + seenAlreadyExists++ + } else if err != nil { return errors.E(op, err) } infoPath := config.PackageVersionedName(module, version, "info") - err = s.upload(ctx, infoPath, bytes.NewReader(info)) + err = s.upload(ctx, infoPath, bytes.NewReader(info), false) + // Have all three returned errors.KindAlreadyExists? + if errors.Is(err, errors.KindAlreadyExists) { + if seenAlreadyExists == 2 { + return errors.E(op, err) + } + } else if err != nil { + return errors.E(op, err) + } + return nil +} + +func (s *Storage) removeInProgressMetadata(ctx context.Context, gomodPath string) error { + const op errors.Op = "gcp.removeInProgressMetadata" + ctx, span := observ.StartSpan(ctx, op.String()) + defer span.End() + _, err := s.bucket.Object(gomodPath).Update(ctx, storage.ObjectAttrsToUpdate{ + Metadata: map[string]string{}, + }) if err != nil { return errors.E(op, err) } return nil } -func (s *Storage) upload(ctx context.Context, path string, stream io.Reader) error { +func (s *Storage) checkUploadInProgress(ctx context.Context, gomodPath string) (bool, error) { + const op errors.Op = "gcp.checkUploadInProgress" + ctx, span := observ.StartSpan(ctx, op.String()) + defer span.End() + attrs, err := s.bucket.Object(gomodPath).Attrs(ctx) + if err != nil { + return false, errors.E(op, err) + } + // If we have a config-set lock threshold, i.e. we are using the GCP + // slightflight backend, use it. Otherwise, use the fallback, which + // is arguably irrelevant when not using GCP for singleflighting. + threshold := fallbackInProgressStaleThreshold + if s.staleThreshold > 0 { + threshold = s.staleThreshold + } + if attrs.Metadata != nil { + _, ok := attrs.Metadata["in_progress"] + if ok { + // In case the final call to remove the metadata fails for some reason, + // we have a threshold after which we consider this to be stale. + if time.Since(attrs.Created) > threshold { + return false, nil + } + return true, nil + } + } + return false, nil +} + +func (s *Storage) upload(ctx context.Context, path string, stream io.Reader, first bool) error { const op errors.Op = "gcp.upload" ctx, span := observ.StartSpan(ctx, op.String()) defer span.End() + cancelCtx, cancel := context.WithCancel(ctx) + defer cancel() + wc := s.bucket.Object(path).If(storage.Conditions{ DoesNotExist: true, - }).NewWriter(ctx) + }).NewWriter(cancelCtx) + + // We set this metadata only for the first of the three files uploaded, + // for use as a singleflight lock. + if first { + wc.ObjectAttrs.Metadata = make(map[string]string) + wc.ObjectAttrs.Metadata["in_progress"] = "true" + } // NOTE: content type is auto detected on GCP side and ACL defaults to public // Once we support private storage buckets this may need refactoring // unless there is a way to set the default perms in the project. if _, err := io.Copy(wc, stream); err != nil { - _ = wc.Close() + // Purposely do not close it to avoid creating a partial file. return err } diff --git a/pkg/storage/s3/s3.go b/pkg/storage/s3/s3.go index 8d648b120..3ae2d1732 100644 --- a/pkg/storage/s3/s3.go +++ b/pkg/storage/s3/s3.go @@ -40,10 +40,6 @@ func New(s3Conf *config.S3Config, timeout time.Duration, options ...func(*aws.Co return nil, errors.E(op, err) } - // Remove anonymous credentials from the default config so that - // session.NewSession can auto-resolve credentials from role, profile, env etc. - awsConfig.Credentials = nil - for _, o := range options { o(&awsConfig) } diff --git a/scripts/test_e2e.sh b/scripts/test_e2e.sh new file mode 100755 index 000000000..c3bf1b37a --- /dev/null +++ b/scripts/test_e2e.sh @@ -0,0 +1,7 @@ +#!/bin/bash + +# test_e2e.sh + +# Run the e2e tests with the race detector enabled +set -xeuo pipefail +cd e2etests && go test --tags e2etests -race