Skip to content

Commit

Permalink
singleflight/gcs: Add an option to set the stale threshold
Browse files Browse the repository at this point in the history
This is useful if, for example, the user is on a slow network.

Signed-off-by: Derek Buitenhuis <[email protected]>
  • Loading branch information
dwbuiten committed May 8, 2024
1 parent aa92813 commit 1814b41
Show file tree
Hide file tree
Showing 10 changed files with 83 additions and 12 deletions.
6 changes: 3 additions & 3 deletions cmd/proxy/actions/app_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func addProxyRoutes(

lister := module.NewVCSLister(c.GoBinary, c.GoBinaryEnvVars, fs)
checker := storage.WithChecker(s)
withSingleFlight, err := getSingleFlight(l, c, checker)
withSingleFlight, err := getSingleFlight(l, c, s, checker)
if err != nil {
return err
}
Expand Down Expand Up @@ -137,7 +137,7 @@ func (l *athensLoggerForRedis) Printf(ctx context.Context, format string, v ...a
l.logger.WithContext(ctx).Printf(format, v...)
}

func getSingleFlight(l *log.Logger, c *config.Config, checker storage.Checker) (stash.Wrapper, error) {
func getSingleFlight(l *log.Logger, c *config.Config, s storage.Backend, checker storage.Checker) (stash.Wrapper, error) {
switch c.SingleFlightType {
case "", "memory":
return stash.WithSingleflight, nil
Expand Down Expand Up @@ -173,7 +173,7 @@ func getSingleFlight(l *log.Logger, c *config.Config, checker storage.Checker) (
if c.StorageType != "gcp" {
return nil, fmt.Errorf("gcp SingleFlight only works with a gcp storage type and not: %v", c.StorageType)
}
return stash.WithGCSLock, nil
return stash.WithGCSLock(c.SingleFlight.GCP.StaleThreshold, s)
case "azureblob":
if c.StorageType != "azureblob" {
return nil, fmt.Errorf("azureblob SingleFlight only works with a azureblob storage type and not: %v", c.StorageType)
Expand Down
4 changes: 4 additions & 0 deletions config.dev.toml
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,10 @@ ShutdownTimeout = 60
# Max retries while acquiring the lock. Defaults to 10.
# Env override: ATHENS_REDIS_LOCK_MAX_RETRIES
MaxRetries = 10
[SingleFlight.GCP]
# Threshold for how long to wait in seconds for an in-progress GCP upload to
# be considered to have failed to unlock.
StaleThreshold = 120
[Storage]
# Only storage backends that are specified in Proxy.StorageType are required here
[Storage.CDN]
Expand Down
11 changes: 11 additions & 0 deletions docs/content/configuration/storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -492,3 +492,14 @@ Optionally, like `redis`, you can also specify a password to connect to the `red
SentinelPassword = "sekret"

Distributed lock options can be customised for redis sentinal as well, in a similar manner as described above for redis.


### Using GCP as a singleflight mechanism

The GCP singleflight mechanism does not required configuration, and works out of the box. It has a
single option with which it can be customized:

[SingleFlight.GCP]
# Threshold for how long to wait in seconds for an in-progress GCP upload to
# be considered to have failed to unlock.
StaleThreshold = 120
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ func defaultConfig() *Config {
SentinelPassword: "sekret",
LockConfig: DefaultRedisLockConfig(),
},
GCP: DefaultGCPConfig(),
},
Index: &Index{
MySQL: &MySQL{
Expand Down
3 changes: 3 additions & 0 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ func TestParseExampleConfig(t *testing.T) {
LockConfig: DefaultRedisLockConfig(),
},
Etcd: &Etcd{Endpoints: "localhost:2379,localhost:22379,localhost:32379"},
GCP: DefaultGCPConfig(),
}

expConf := &Config{
Expand Down Expand Up @@ -391,6 +392,8 @@ func getEnvMap(config *Config) map[string]string {
} else if singleFlight.Etcd != nil {
envVars["ATHENS_SINGLE_FLIGHT_TYPE"] = "etcd"
envVars["ATHENS_ETCD_ENDPOINTS"] = singleFlight.Etcd.Endpoints
} else if singleFlight.GCP != nil {
envVars["ATHENS_GCP_STALE_THRESHOLD"] = strconv.Itoa(singleFlight.GCP.StaleThreshold)
}
}
return envVars
Expand Down
13 changes: 13 additions & 0 deletions pkg/config/singleflight.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ type SingleFlight struct {
Etcd *Etcd
Redis *Redis
RedisSentinel *RedisSentinel
GCP *GCP
}

// Etcd holds client side configuration
Expand Down Expand Up @@ -48,3 +49,15 @@ func DefaultRedisLockConfig() *RedisLockConfig {
MaxRetries: 10,
}
}

// GCP is the configuration for GCP locking.
type GCP struct {
StaleThreshold int `envconfig:"ATHENS_GCP_STALE_THRESHOLD"`
}

// DefaultGCPConfig returns the default GCP locking configuration.
func DefaultGCPConfig() *GCP {
return &GCP{
StaleThreshold: 120,
}
}
21 changes: 19 additions & 2 deletions pkg/stash/with_gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,32 @@ package stash

import (
"context"
"fmt"
"time"

"github.com/gomods/athens/pkg/errors"
"github.com/gomods/athens/pkg/observ"
"github.com/gomods/athens/pkg/storage"
"github.com/gomods/athens/pkg/storage/gcp"
)

// WithGCSLock returns a distributed singleflight
// using a GCS backend. See the config.toml documentation for details.
func WithGCSLock(s Stasher) Stasher {
return &gcsLock{s}
func WithGCSLock(staleThreshold int, s storage.Backend) (Wrapper, error) {
if staleThreshold <= 0 {
return nil, errors.E("stash.WithGCSLock", fmt.Errorf("invalid stale threshold"))
}
// Since we *must* be using a GCP stoagfe backend, we can abuse this
// fact to mutate it, so that we can get our threshold into Save().
// Your instincts are correct, this is kind of gross.
gs, ok := s.(*gcp.Storage)
if !ok {
return nil, errors.E("stash.WithGCSLock", fmt.Errorf("GCP singleflight can only be used with GCP storage"))
}
gs.SetStaleThreshold(time.Duration(staleThreshold) * time.Second)
return func(s Stasher) Stasher {
return &gcsLock{s}
}, nil
}

type gcsLock struct {
Expand Down
12 changes: 10 additions & 2 deletions pkg/stash/with_gcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ func TestWithGCS(t *testing.T) {
for i := 0; i < 5; i++ {
content := uuid.New().String()
ms := &mockGCPStasher{strg, content}
s := WithGCSLock(ms)
gs, err := WithGCSLock(120, strg)
if err != nil {
t.Fatal(err)
}
s := gs(ms)
eg.Go(func() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
Expand Down Expand Up @@ -109,6 +113,11 @@ func TestWithGCSPartialFailure(t *testing.T) {
content := uuid.New().String()
ms := &mockGCPStasher{strg, content}
fr := new(failReader)
gs, err := WithGCSLock(120, strg)
if err != nil {
t.Fatal(err)
}
s := gs(ms)
// We simulate a failure by manually passing an io.Reader that will fail.
err = ms.strg.Save(ctx, "stashmod", "v1.0.0", []byte(ms.content), fr, []byte(ms.content))
if err == nil {
Expand All @@ -117,7 +126,6 @@ func TestWithGCSPartialFailure(t *testing.T) {
}

// Now try a Stash. This should upload the missing files.
s := WithGCSLock(ms)
_, err = s.Stash(ctx, "stashmod", "v1.0.0")
if err != nil {
t.Fatal(err)
Expand Down
5 changes: 3 additions & 2 deletions pkg/storage/gcp/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ import (

// Storage implements the (./pkg/storage).Backend interface.
type Storage struct {
bucket *storage.BucketHandle
timeout time.Duration
bucket *storage.BucketHandle
timeout time.Duration
staleThreshold time.Duration
}

// New returns a new Storage instance backed by a Google Cloud Storage bucket.
Expand Down
19 changes: 16 additions & 3 deletions pkg/storage/gcp/saver.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (
googleapi "google.golang.org/api/googleapi"
)

// After how long we consider an "in_progress" metadata key stale,
// Fallback for how long we consider an "in_progress" metadata key stale,
// due to failure to remove it.
const inProgressStaleThreshold = 2 * time.Minute
const fallbackInProgressStaleThreshold = 2 * time.Minute

// Save uploads the module's .mod, .zip and .info files for a given version
// It expects a context, which can be provided using context.Background
Expand Down Expand Up @@ -48,6 +48,12 @@ func (s *Storage) Save(ctx context.Context, module, version string, mod []byte,
return innerErr
}

// SetStaleThreshold sets the threshold of how long we consider
// a lock metadata stale after.
func (s *Storage) SetStaleThreshold(threshold time.Duration) {
s.staleThreshold = threshold
}

func (s *Storage) save(ctx context.Context, module, version string, mod []byte, zip io.Reader, info []byte) error {
const op errors.Op = "gcp.save"
ctx, span := observ.StartSpan(ctx, op.String())
Expand Down Expand Up @@ -115,12 +121,19 @@ func (s *Storage) checkUploadInProgress(ctx context.Context, gomodPath string) (
if err != nil {
return false, errors.E(op, err)
}
// If we have a config-set lock threshold, i.e. we are using the GCP
// slightflight backend, use it. Otherwise, use the fallback, which
// is arguably irrelevant when not using GCP for singleflighting.
threshold := fallbackInProgressStaleThreshold
if s.staleThreshold > 0 {
threshold = s.staleThreshold
}
if attrs.Metadata != nil {
_, ok := attrs.Metadata["in_progress"]
if ok {
// In case the final call to remove the metadata fails for some reason,
// we have a threshold after which we consider this to be stale.
if time.Since(attrs.Created) > inProgressStaleThreshold {
if time.Since(attrs.Created) > threshold {
return false, nil
}
return true, nil
Expand Down

0 comments on commit 1814b41

Please sign in to comment.