diff --git a/README.md b/README.md index 228461f..81e2464 100644 --- a/README.md +++ b/README.md @@ -39,6 +39,7 @@ Usage | `token` | The [Consul API token][Consul ACLs]. There is no default value. | `prefix`* | The source prefix including the data center, with optional destination prefix, separated by a colon (`:`). This option is additive and may be specified multiple times for multiple prefixes to replicate. | `exclude` | A prefix to exclude during replication. This option is additive and may be specified multiple times for multiple prefixes to exclude. +| `excludematch` | A substring that causes a key to be exclude when it matches the source during replication. This option is additive and may be specified multiple times for multiple substrings to exclude. | `wait` | The `minimum(:maximum)` to wait for stability before replicating, separated by a colon (`:`). If the optional maximum value is omitted, it is assumed to be 4x the required minimum value. There is no default value. | `retry` | The amount of time to wait if Consul returns an error when communicating with the API. The default value is 5 seconds. | `config` | The path to a configuration file or directory of configuration files on disk, relative to the current working directory. Values specified on the CLI take precedence over values specified in the configuration file. There is no default value. @@ -132,6 +133,10 @@ prefix { exclude { source = "vault/core/lock" } + +excludematch { + source = "_dc1_only_" +} ``` If a directory is given instead of a file, all files in the directory (recursively) will be merged in [lexical order](http://golang.org/pkg/path/filepath/#Walk). So if multiple files declare a "consul" key for instance, the last one will be used. diff --git a/cli.go b/cli.go index 47bf39e..0fdb717 100644 --- a/cli.go +++ b/cli.go @@ -263,6 +263,14 @@ func (cli *CLI) parseFlags(args []string) (*Config, bool, bool, error) { return nil }), "exclude", "") + flags.Var((funcVar)(func(s string) error { + if c.ExcludeMatches == nil { + c.ExcludeMatches = make([]*ExcludeMatch, 0, 1) + } + c.ExcludeMatches = append(c.ExcludeMatches, &ExcludeMatch{Source: s}) + return nil + }), "excludematch", "") + flags.Var((funcBoolVar)(func(b bool) error { c.Syslog.Enabled = b c.set("syslog") @@ -410,6 +418,8 @@ Options: omitted, it is assumed to be the same as the source -exclude= Provides a prefix to exclude from replication + -excludematch= Provides a path match to exclude from replication + -wait= Sets the 'minumum(:maximum)' amount of time to wait before replicating -retry= The amount of time to wait if Consul returns an diff --git a/cli_test.go b/cli_test.go index bf0d8f2..3e24464 100644 --- a/cli_test.go +++ b/cli_test.go @@ -306,6 +306,25 @@ func TestParseFlags_excludes(t *testing.T) { } } +func TestParseFlags_excludematches(t *testing.T) { + cli := NewCLI(ioutil.Discard, ioutil.Discard) + config, _, _, err := cli.parseFlags([]string{ + "-excludematch", "excludematched/", + }) + if err != nil { + t.Fatal(err) + } + + if len(config.ExcludeMatches) != 1 { + t.Fatal("expected 1 exclude match") + } + + excludematch := config.ExcludeMatches[0] + if excludematch.Source != "excludematched/" { + t.Errorf("expected %q to be %q", excludematch.Source, "excludematched/") + } +} + func TestParseFlags_syslog(t *testing.T) { cli := NewCLI(ioutil.Discard, ioutil.Discard) config, _, _, err := cli.parseFlags([]string{ diff --git a/config.go b/config.go index c40e36d..96ef8f3 100644 --- a/config.go +++ b/config.go @@ -37,6 +37,9 @@ type Config struct { // Excludes is the list of key prefixes to exclude from replication. Excludes []*Exclude `mapstructure:"exclude"` + // ExcludeMatches is the list of key match expressions to exclude from replication. + ExcludeMatches []*ExcludeMatch `mapstructure:"excludematch"` + // Auth is the HTTP basic authentication for communicating with Consul. Auth *AuthConfig `mapstructure:"auth"` @@ -129,6 +132,13 @@ func (c *Config) Copy() *Config { } } + o.ExcludeMatches = make([]*ExcludeMatch, len(c.ExcludeMatches)) + for i, p := range c.ExcludeMatches { + o.ExcludeMatches[i] = &ExcludeMatch{ + Source: p.Source, + } + } + o.Retry = c.Retry if c.Wait != nil { @@ -259,6 +269,18 @@ func (c *Config) Merge(o *Config) { } } + if o.ExcludeMatches != nil { + if c.ExcludeMatches == nil { + c.ExcludeMatches = []*ExcludeMatch{} + } + + for _, excludematch := range o.ExcludeMatches { + c.ExcludeMatches = append(c.ExcludeMatches, &ExcludeMatch{ + Source: excludematch.Source, + }) + } + } + if o.WasSet("retry") { c.Retry = o.Retry } @@ -426,6 +448,7 @@ func DefaultConfig() *Config { LogLevel: logLevel, Prefixes: []*Prefix{}, Excludes: []*Exclude{}, + ExcludeMatches: []*ExcludeMatch{}, Retry: 5 * time.Second, StatusDir: "service/consul-replicate/statuses", Wait: &config.WaitConfig{ @@ -532,6 +555,11 @@ type Exclude struct { Source string `mapstructure:"source"` } +// Exclude is a key path match to exclude from replication +type ExcludeMatch struct { + Source string `mapstructure:"source"` +} + // ParsePrefix parses a prefix of the format "source@dc:destination" into the // Prefix component. func ParsePrefix(s string) (*Prefix, error) { diff --git a/config_test.go b/config_test.go index 1e50d16..35bfaa9 100644 --- a/config_test.go +++ b/config_test.go @@ -257,6 +257,32 @@ func TestMerge_Excludes(t *testing.T) { } } +func TestMerge_ExcludeMatches(t *testing.T) { + config1 := testConfig(` + excludematch { + source = "foo" + } + `, t) + config2 := testConfig(` + excludematch { + source = "foo-2" + } + `, t) + config1.Merge(config2) + + if len(config1.ExcludeMatches) != 2 { + t.Fatalf("bad exclude matches %d", len(config1.ExcludeMatches)) + } + + if config1.ExcludeMatches[0].Source != "foo" { + t.Errorf("bad source: %#v", config1.ExcludeMatches[0].Source) + } + + if config1.ExcludeMatches[1].Source != "foo-2" { + t.Errorf("bad source: %#v", config1.ExcludeMatches[1].Source) + } +} + func TestMerge_wait(t *testing.T) { config1 := testConfig(` wait = "1s:1s" @@ -331,6 +357,7 @@ func TestParseConfig_correctValues(t *testing.T) { }, Prefixes: []*Prefix{}, Excludes: []*Exclude{}, + ExcludeMatches: []*ExcludeMatch{}, SSL: &SSLConfig{ Enabled: true, Verify: false, diff --git a/runner.go b/runner.go index 56c4097..2fd7f91 100644 --- a/runner.go +++ b/runner.go @@ -205,7 +205,7 @@ func (r *Runner) Run() error { // Replicate each prefix in a goroutine for _, prefix := range prefixes { - go r.replicate(prefix, r.config.Excludes, doneCh, errCh) + go r.replicate(prefix, r.config.Excludes, r.config.ExcludeMatches, doneCh, errCh) } var errs *multierror.Error @@ -273,7 +273,7 @@ func (r *Runner) get(prefix *Prefix) (*watch.View, bool) { // replicate performs replication into the current datacenter from the given // prefix. This function is designed to be called via a goroutine since it is // expensive and needs to be parallelized. -func (r *Runner) replicate(prefix *Prefix, excludes []*Exclude, doneCh chan struct{}, errCh chan error) { +func (r *Runner) replicate(prefix *Prefix, excludes []*Exclude, excludematches []*ExcludeMatch, doneCh chan struct{}, errCh chan error) { // Ensure we are not self-replicating info, err := r.clients.Consul().Agent().Self() if err != nil { @@ -334,6 +334,22 @@ func (r *Runner) replicate(prefix *Prefix, excludes []*Exclude, doneCh chan stru } } + // Ignore if the key falls under an exclude path match + if len(excludematches) > 0 { + excludematched := false + for _, excludematch := range excludematches { + if strings.Contains(pair.Path, excludematch.Source) { + log.Printf("[DEBUG] (runner) key %q contains %q, excluding", + pair.Path, excludematch.Source) + excludematched = true + } + } + + if excludematched { + continue + } + } + // Ignore if the modify index is old if pair.ModifyIndex <= status.LastReplicated { log.Printf("[DEBUG] (runner) skipping because %q is already "+ @@ -380,6 +396,7 @@ func (r *Runner) replicate(prefix *Prefix, excludes []*Exclude, doneCh chan stru } for _, key := range localKeys { excluded := false + excludematched := false // Ignore if the key falls under an excluded prefix if len(excludes) > 0 { @@ -393,7 +410,19 @@ func (r *Runner) replicate(prefix *Prefix, excludes []*Exclude, doneCh chan stru } } - if _, ok := usedKeys[key]; !ok && !excluded { + // Ignore if the key falls under an excluded match path + if !excluded && len(excludematches) > 0 { + sourceKey := strings.Replace(key, prefix.Destination, prefix.Source, -1) + for _, excludematch := range excludematches { + if strings.Contains(sourceKey, excludematch.Source) { + log.Printf("[DEBUG] (runner) key %q contains %q, excluding from deletes", + sourceKey, excludematch.Source) + excludematched = true + } + } + } + + if _, ok := usedKeys[key]; !ok && !excluded && !excludematched { if _, err := kv.Delete(key, nil); err != nil { errCh <- fmt.Errorf("failed to delete %q: %s", key, err) return diff --git a/runner_test.go b/runner_test.go index aa6502e..cf71d7e 100644 --- a/runner_test.go +++ b/runner_test.go @@ -18,6 +18,9 @@ func TestNewRunner_initialize(t *testing.T) { Excludes: []*Exclude{ &Exclude{Source: "3"}, }, + ExcludeMatches: []*ExcludeMatch{ + &ExcludeMatch{Source: "2"}, + }, } runner, err := NewRunner(config, once) diff --git a/scripts/integration.sh b/scripts/integration.sh index f0fc375..84babc4 100755 --- a/scripts/integration.sh +++ b/scripts/integration.sh @@ -90,6 +90,7 @@ $CONSUL_REPLICATE_BIN \ -consul $ADDRESS_DC2 \ -prefix "global@dc1:backup" \ -exclude "global/$EXCLUDED_KEY" \ + -excludematch "excluded_" \ -log-level $LOG_LEVEL & CONSUL_REPLICATE_PID=$! sleep 3 @@ -99,6 +100,7 @@ curl -sLo /dev/null -X PUT $ADDRESS_DC1/v1/kv/global/six -d "six" sleep 3 curl -sL $ADDRESS_DC2/v1/kv/backup/six | grep -q "c2l4" +echo "##Test Case #1" echo " Writing a key in DC2" curl -sLo /dev/null -X PUT $ADDRESS_DC2/v1/kv/backup/$EXCLUDED_KEY/nodelete -d "don't delete" sleep 3 @@ -110,6 +112,18 @@ sleep 3 echo " Checking key still exists in DC2" curl -sL $ADDRESS_DC2/v1/kv/backup/$EXCLUDED_KEY/nodelete | grep -q "ZG9uJ3QgZGVsZXRl" +echo "##Test Case #2" +echo " Writing a key in DC2" +curl -sLo /dev/null -X PUT $ADDRESS_DC2/v1/kv/backup/excluded_key -d "don't delete" +sleep 3 + +echo " Updating prefix in DC1" +curl -sLo /dev/null -X PUT $ADDRESS_DC1/v1/kv/global/parent_folder/other_folder/anykey -d "test data" +sleep 3 + +echo " Checking key still exists in DC2" +curl -sL $ADDRESS_DC2/v1/kv/backup/excluded_key | grep -q "ZG9uJ3QgZGVsZXRl" + rm -rf $DATADIR_DC1 rm -rf $DATADIR_DC2