Skip to content

Commit

Permalink
[3.2.1 Backport] CBG-4089: Do not remove loaded database on transient…
Browse files Browse the repository at this point in the history
… fetch error (#7132)

Co-authored-by: Adam Fraser <[email protected]>
  • Loading branch information
bbrks and adamcfraser authored Oct 10, 2024
1 parent ef737b5 commit 8d0bb46
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 51 deletions.
8 changes: 6 additions & 2 deletions base/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,12 @@ func IsTemporaryKvError(err error) bool {
return false
}
// define list of temporary errors
temporaryKVError := []error{ErrTimeout, gocb.ErrAmbiguousTimeout, gocb.ErrUnambiguousTimeout,
gocb.ErrOverload, gocb.ErrTemporaryFailure, gocb.ErrCircuitBreakerOpen}
temporaryKVError := []error{
ErrTimeout, // Sync Gateway client-side timeout
gocb.ErrTimeout, // SDK op timeout. Wrapped by gocb.ErrAmbiguousTimeout, gocb.ErrUnambiguousTimeout,
gocb.ErrOverload, // SDK client-side pipeline queue full, request was not submitted to server
gocb.ErrTemporaryFailure, // Couchbase Server returned temporary failure error
gocb.ErrCircuitBreakerOpen} // SDK client-side circuit breaker blocked request

// iterate through to check incoming error is one of them
for _, tempKVErr := range temporaryKVError {
Expand Down
85 changes: 85 additions & 0 deletions rest/adminapitest/admin_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1777,6 +1777,91 @@ func TestMultipleBucketWithBadDbConfigScenario3(t *testing.T) {

}

// TestConfigPollingRemoveDatabase:
//
// Validates db is removed when polling detects that the config is not found
func TestConfigPollingRemoveDatabase(t *testing.T) {

base.SetUpTestLogging(t, base.LevelInfo, base.KeyConfig)
testCases := []struct {
useXattrConfig bool
}{
{
useXattrConfig: false,
},
{
useXattrConfig: true,
},
}
for _, testCase := range testCases {
t.Run(fmt.Sprintf("xattrConfig_%v", testCase.useXattrConfig), func(t *testing.T) {

rt := rest.NewRestTester(t, &rest.RestTesterConfig{
CustomTestBucket: base.GetTestBucket(t),
PersistentConfig: true,
MutateStartupConfig: func(config *rest.StartupConfig) {
// configure the interval time to pick up new configs from the bucket to every 50 milliseconds
config.Bootstrap.ConfigUpdateFrequency = base.NewConfigDuration(50 * time.Millisecond)
},
DatabaseConfig: nil,
UseXattrConfig: testCase.useXattrConfig,
})
defer rt.Close()

ctx := base.TestCtx(t)
// create a new db
dbName := "db1"
dbConfig := rt.NewDbConfig()
dbConfig.Name = dbName
dbConfig.BucketConfig.Bucket = base.StringPtr(rt.CustomTestBucket.GetName())
resp := rt.CreateDatabase(dbName, dbConfig)
rest.RequireStatus(t, resp, http.StatusCreated)

// Validate that db is loaded
_, err := rt.ServerContext().GetDatabase(ctx, dbName)
require.NoError(t, err)

// Force timeouts - dev-time only test enhancement to validate CBG-3947, requires manual "leaky bootstrap" handling
// To enable:
// - Add "var ForceTimeouts bool" to bootstrap.go
// - In CouchbaseCluster.GetMetadataDocument, add the following after loadConfig:
// if ForceTimeouts {
// return 0, gocb.ErrTimeout
// }
// - enable the code block below
/*
base.ForceTimeouts = true
// Wait to ensure database doesn't disappear
err = rt.WaitForConditionWithOptions(func() bool {
_, err := rt.ServerContext().GetActiveDatabase(dbName)
return errors.Is(err, base.ErrNotFound)
}, 200, 50)
require.Error(t, err)
base.ForceTimeouts = false
*/

// Delete the config directly
rt.RemoveDbConfigFromBucket("db1", rt.CustomTestBucket.GetName())

// assert that the database is unloaded
err = rt.WaitForConditionWithOptions(func() bool {
_, err := rt.ServerContext().GetActiveDatabase(dbName)
return errors.Is(err, base.ErrNotFound)

}, 200, 1000)
require.NoError(t, err)

// assert that a request to the database fails with correct error message
resp = rt.SendAdminRequest(http.MethodGet, "/db1/_config", "")
rest.RequireStatus(t, resp, http.StatusNotFound)
assert.Contains(t, resp.Body.String(), "no such database")
})
}
}

func TestResyncStopUsingDCPStream(t *testing.T) {
if base.UnitTestUrlIsWalrus() {
// This test requires a gocb bucket
Expand Down
111 changes: 62 additions & 49 deletions rest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1635,17 +1635,26 @@ func (sc *ServerContext) fetchAndLoadConfigs(ctx context.Context, isInitialStart
sc.lock.Lock()
defer sc.lock.Unlock()
for _, dbName := range deletedDatabases {
dbc, ok := sc.databases_[dbName]
if !ok {
base.DebugfCtx(ctx, base.KeyConfig, "Database %q already removed from server context after acquiring write lock - do not need to remove not removing database", base.MD(dbName))
continue
}
// It's possible that the "deleted" database was not written to the server until after sc.FetchConfigs had returned...
// we'll need to pay for the cost of getting the config again now that we've got the write lock to double-check this db is definitely ok to remove...
found, _, err := sc._fetchDatabase(ctx, dbName)
if err != nil {
base.InfofCtx(ctx, base.KeyConfig, "Error fetching config for database %q to check whether we need to remove it: %v", dbName, err)
found, _, getConfigErr := sc._fetchDatabaseFromBucket(ctx, dbc.Bucket.GetName(), dbName)
if found && getConfigErr == nil {
base.DebugfCtx(ctx, base.KeyConfig, "Found config for database %q after acquiring write lock - not removing database", base.MD(dbName))
continue
}
if base.IsTemporaryKvError(getConfigErr) {
base.InfofCtx(ctx, base.KeyConfig, "Transient error fetching config for database %q to check whether we need to remove it, will not be removed: %v", base.MD(dbName), getConfigErr)
continue
}

if !found {
base.InfofCtx(ctx, base.KeyConfig, "Database %q was running on this node, but config was not found on the server - removing database", base.MD(dbName))
base.InfofCtx(ctx, base.KeyConfig, "Database %q was running on this node, but config was not found on the server - removing database (%v)", base.MD(dbName), getConfigErr)
sc._removeDatabase(ctx, dbName)
} else {
base.DebugfCtx(ctx, base.KeyConfig, "Found config for database %q after acquiring write lock - not removing database", base.MD(dbName))
}
}

Expand Down Expand Up @@ -1753,56 +1762,60 @@ func (sc *ServerContext) fetchDatabase(ctx context.Context, dbName string) (foun
return sc._fetchDatabase(ctx, dbName)
}

func (sc *ServerContext) _fetchDatabase(ctx context.Context, dbName string) (found bool, dbConfig *DatabaseConfig, err error) {
// loop code moved to foreachDbConfig
var cnf DatabaseConfig
callback := func(bucket string) (exit bool, err error) {
cas, err := sc.BootstrapContext.GetConfig(ctx, bucket, sc.Config.Bootstrap.ConfigGroupID, dbName, &cnf)
if err == base.ErrNotFound {
base.DebugfCtx(ctx, base.KeyConfig, "%q did not contain config in group %q", bucket, sc.Config.Bootstrap.ConfigGroupID)
return false, err
}
if err != nil {
base.DebugfCtx(ctx, base.KeyConfig, "unable to fetch config in group %q from bucket %q: %v", sc.Config.Bootstrap.ConfigGroupID, bucket, err)
return false, err
}
func (sc *ServerContext) _fetchDatabaseFromBucket(ctx context.Context, bucket string, dbName string) (found bool, cnf DatabaseConfig, err error) {

if cnf.Name == "" {
cnf.Name = bucket
}
cas, err := sc.BootstrapContext.GetConfig(ctx, bucket, sc.Config.Bootstrap.ConfigGroupID, dbName, &cnf)
if errors.Is(err, base.ErrNotFound) {
base.DebugfCtx(ctx, base.KeyConfig, "%q did not contain config in group %q", bucket, sc.Config.Bootstrap.ConfigGroupID)
return false, cnf, err
}
if err != nil {
base.DebugfCtx(ctx, base.KeyConfig, "unable to fetch config in group %q from bucket %q: %v", sc.Config.Bootstrap.ConfigGroupID, bucket, err)
return false, cnf, err
}

if cnf.Name != dbName {
base.TracefCtx(ctx, base.KeyConfig, "%q did not contain config in group %q for db %q", bucket, sc.Config.Bootstrap.ConfigGroupID, dbName)
return false, err
}
if cnf.Name == "" {
cnf.Name = bucket
}

cnf.cfgCas = cas
if cnf.Name != dbName {
base.TracefCtx(ctx, base.KeyConfig, "%q did not contain config in group %q for db %q", bucket, sc.Config.Bootstrap.ConfigGroupID, dbName)
return false, cnf, err
}

// TODO: This code is mostly copied from FetchConfigs, move into shared function with DbConfig REST API work?
cnf.cfgCas = cas

// inherit properties the bootstrap config
cnf.CACertPath = sc.Config.Bootstrap.CACertPath
// inherit properties the bootstrap config
cnf.CACertPath = sc.Config.Bootstrap.CACertPath

// We need to check for corruption in the database config (CC. CBG-3292). If the fetched config doesn't match the
// bucket name we got the config from we need to maker this db context as corrupt. Then remove the context and
// in memory representation on the server context.
if bucket != cnf.GetBucketName() {
sc._handleInvalidDatabaseConfig(ctx, bucket, cnf)
return true, fmt.Errorf("mismatch in persisted database bucket name %q vs the actual bucket name %q. Please correct db %q's config, groupID %q.", base.MD(cnf.Bucket), base.MD(bucket), base.MD(cnf.Name), base.MD(sc.Config.Bootstrap.ConfigGroupID))
}
bucketCopy := bucket
// no corruption detected carry on as usual
cnf.Bucket = &bucketCopy
// We need to check for corruption in the database config (CC. CBG-3292). If the fetched config doesn't match the
// bucket name we got the config from we need to maker this db context as corrupt. Then remove the context and
// in memory representation on the server context.
if bucket != cnf.GetBucketName() {
sc._handleInvalidDatabaseConfig(ctx, bucket, cnf)
return true, cnf, fmt.Errorf("mismatch in persisted database bucket name %q vs the actual bucket name %q. Please correct db %q's config, groupID %q.", base.MD(cnf.Bucket), base.MD(bucket), base.MD(cnf.Name), base.MD(sc.Config.Bootstrap.ConfigGroupID))
}
bucketCopy := bucket
// no corruption detected carry on as usual
cnf.Bucket = &bucketCopy

// any authentication fields defined on the dbconfig take precedence over any in the bootstrap config
if cnf.Username == "" && cnf.Password == "" && cnf.CertPath == "" && cnf.KeyPath == "" {
cnf.Username = sc.Config.Bootstrap.Username
cnf.Password = sc.Config.Bootstrap.Password
cnf.CertPath = sc.Config.Bootstrap.X509CertPath
cnf.KeyPath = sc.Config.Bootstrap.X509KeyPath
}
base.TracefCtx(ctx, base.KeyConfig, "Got database config %s for bucket %q with cas %d and groupID %q", base.MD(dbName), base.MD(bucket), cas, base.MD(sc.Config.Bootstrap.ConfigGroupID))
return true, nil
// any authentication fields defined on the dbconfig take precedence over any in the bootstrap config
if cnf.Username == "" && cnf.Password == "" && cnf.CertPath == "" && cnf.KeyPath == "" {
cnf.Username = sc.Config.Bootstrap.Username
cnf.Password = sc.Config.Bootstrap.Password
cnf.CertPath = sc.Config.Bootstrap.X509CertPath
cnf.KeyPath = sc.Config.Bootstrap.X509KeyPath
}
base.TracefCtx(ctx, base.KeyConfig, "Got database config %s for bucket %q with cas %d and groupID %q", base.MD(dbName), base.MD(bucket), cas, base.MD(sc.Config.Bootstrap.ConfigGroupID))
return true, cnf, nil
}

func (sc *ServerContext) _fetchDatabase(ctx context.Context, dbName string) (found bool, dbConfig *DatabaseConfig, err error) {
var cnf DatabaseConfig
callback := func(bucket string) (exit bool, callbackErr error) {
var foundInBucket bool
foundInBucket, cnf, callbackErr = sc._fetchDatabaseFromBucket(ctx, bucket, dbName)
return foundInBucket, callbackErr
}

err = sc.findBucketWithCallback(callback)
Expand Down
2 changes: 2 additions & 0 deletions rest/utilities_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type RestTesterConfig struct {
syncGatewayVersion *base.ComparableBuildVersion // alternate version of Sync Gateway to use on startup
allowDbConfigEnvVars *bool
maxConcurrentRevs *int
UseXattrConfig bool
}

type collectionConfiguration uint8
Expand Down Expand Up @@ -234,6 +235,7 @@ func (rt *RestTester) Bucket() base.Bucket {
sc.Bootstrap.ServerTLSSkipVerify = base.BoolPtr(base.TestTLSSkipVerify())
sc.Unsupported.Serverless.Enabled = &rt.serverless
sc.Unsupported.AllowDbConfigEnvVars = rt.RestTesterConfig.allowDbConfigEnvVars
sc.Unsupported.UseXattrConfig = &rt.UseXattrConfig
sc.Replicator.MaxConcurrentRevs = rt.RestTesterConfig.maxConcurrentRevs
if rt.serverless {
if !rt.PersistentConfig {
Expand Down

0 comments on commit 8d0bb46

Please sign in to comment.