Skip to content

Better exclude #76

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ Usage
| `token` | The [Consul API token][Consul ACLs]. There is no default value.
| `prefix`* | The source prefix including the data center, with optional destination prefix, separated by a colon (`:`). This option is additive and may be specified multiple times for multiple prefixes to replicate.
| `exclude` | A prefix to exclude during replication. This option is additive and may be specified multiple times for multiple prefixes to exclude.
| `excludematch` | A substring that causes a key to be exclude when it matches the source during replication. This option is additive and may be specified multiple times for multiple substrings to exclude.
| `wait` | The `minimum(:maximum)` to wait for stability before replicating, separated by a colon (`:`). If the optional maximum value is omitted, it is assumed to be 4x the required minimum value. There is no default value.
| `retry` | The amount of time to wait if Consul returns an error when communicating with the API. The default value is 5 seconds.
| `config` | The path to a configuration file or directory of configuration files on disk, relative to the current working directory. Values specified on the CLI take precedence over values specified in the configuration file. There is no default value.
Expand Down Expand Up @@ -132,6 +133,10 @@ prefix {
exclude {
source = "vault/core/lock"
}

excludematch {
source = "_dc1_only_"
}
```

If a directory is given instead of a file, all files in the directory (recursively) will be merged in [lexical order](http://golang.org/pkg/path/filepath/#Walk). So if multiple files declare a "consul" key for instance, the last one will be used.
Expand Down
10 changes: 10 additions & 0 deletions cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,14 @@ func (cli *CLI) parseFlags(args []string) (*Config, bool, bool, error) {
return nil
}), "exclude", "")

flags.Var((funcVar)(func(s string) error {
if c.ExcludeMatches == nil {
c.ExcludeMatches = make([]*ExcludeMatch, 0, 1)
}
c.ExcludeMatches = append(c.ExcludeMatches, &ExcludeMatch{Source: s})
return nil
}), "excludematch", "")

flags.Var((funcBoolVar)(func(b bool) error {
c.Syslog.Enabled = b
c.set("syslog")
Expand Down Expand Up @@ -410,6 +418,8 @@ Options:
omitted, it is assumed to be the same as the source
-exclude=<src> Provides a prefix to exclude from replication

-excludematch=<src> Provides a path match to exclude from replication

-wait=<duration> Sets the 'minumum(:maximum)' amount of time to wait
before replicating
-retry=<duration> The amount of time to wait if Consul returns an
Expand Down
19 changes: 19 additions & 0 deletions cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,25 @@ func TestParseFlags_excludes(t *testing.T) {
}
}

func TestParseFlags_excludematches(t *testing.T) {
cli := NewCLI(ioutil.Discard, ioutil.Discard)
config, _, _, err := cli.parseFlags([]string{
"-excludematch", "excludematched/",
})
if err != nil {
t.Fatal(err)
}

if len(config.ExcludeMatches) != 1 {
t.Fatal("expected 1 exclude match")
}

excludematch := config.ExcludeMatches[0]
if excludematch.Source != "excludematched/" {
t.Errorf("expected %q to be %q", excludematch.Source, "excludematched/")
}
}

func TestParseFlags_syslog(t *testing.T) {
cli := NewCLI(ioutil.Discard, ioutil.Discard)
config, _, _, err := cli.parseFlags([]string{
Expand Down
28 changes: 28 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ type Config struct {
// Excludes is the list of key prefixes to exclude from replication.
Excludes []*Exclude `mapstructure:"exclude"`

// ExcludeMatches is the list of key match expressions to exclude from replication.
ExcludeMatches []*ExcludeMatch `mapstructure:"excludematch"`

// Auth is the HTTP basic authentication for communicating with Consul.
Auth *AuthConfig `mapstructure:"auth"`

Expand Down Expand Up @@ -129,6 +132,13 @@ func (c *Config) Copy() *Config {
}
}

o.ExcludeMatches = make([]*ExcludeMatch, len(c.ExcludeMatches))
for i, p := range c.ExcludeMatches {
o.ExcludeMatches[i] = &ExcludeMatch{
Source: p.Source,
}
}

o.Retry = c.Retry

if c.Wait != nil {
Expand Down Expand Up @@ -259,6 +269,18 @@ func (c *Config) Merge(o *Config) {
}
}

if o.ExcludeMatches != nil {
if c.ExcludeMatches == nil {
c.ExcludeMatches = []*ExcludeMatch{}
}

for _, excludematch := range o.ExcludeMatches {
c.ExcludeMatches = append(c.ExcludeMatches, &ExcludeMatch{
Source: excludematch.Source,
})
}
}

if o.WasSet("retry") {
c.Retry = o.Retry
}
Expand Down Expand Up @@ -426,6 +448,7 @@ func DefaultConfig() *Config {
LogLevel: logLevel,
Prefixes: []*Prefix{},
Excludes: []*Exclude{},
ExcludeMatches: []*ExcludeMatch{},
Retry: 5 * time.Second,
StatusDir: "service/consul-replicate/statuses",
Wait: &config.WaitConfig{
Expand Down Expand Up @@ -532,6 +555,11 @@ type Exclude struct {
Source string `mapstructure:"source"`
}

// Exclude is a key path match to exclude from replication
type ExcludeMatch struct {
Source string `mapstructure:"source"`
}

// ParsePrefix parses a prefix of the format "source@dc:destination" into the
// Prefix component.
func ParsePrefix(s string) (*Prefix, error) {
Expand Down
27 changes: 27 additions & 0 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,32 @@ func TestMerge_Excludes(t *testing.T) {
}
}

func TestMerge_ExcludeMatches(t *testing.T) {
config1 := testConfig(`
excludematch {
source = "foo"
}
`, t)
config2 := testConfig(`
excludematch {
source = "foo-2"
}
`, t)
config1.Merge(config2)

if len(config1.ExcludeMatches) != 2 {
t.Fatalf("bad exclude matches %d", len(config1.ExcludeMatches))
}

if config1.ExcludeMatches[0].Source != "foo" {
t.Errorf("bad source: %#v", config1.ExcludeMatches[0].Source)
}

if config1.ExcludeMatches[1].Source != "foo-2" {
t.Errorf("bad source: %#v", config1.ExcludeMatches[1].Source)
}
}

func TestMerge_wait(t *testing.T) {
config1 := testConfig(`
wait = "1s:1s"
Expand Down Expand Up @@ -331,6 +357,7 @@ func TestParseConfig_correctValues(t *testing.T) {
},
Prefixes: []*Prefix{},
Excludes: []*Exclude{},
ExcludeMatches: []*ExcludeMatch{},
SSL: &SSLConfig{
Enabled: true,
Verify: false,
Expand Down
35 changes: 32 additions & 3 deletions runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func (r *Runner) Run() error {

// Replicate each prefix in a goroutine
for _, prefix := range prefixes {
go r.replicate(prefix, r.config.Excludes, doneCh, errCh)
go r.replicate(prefix, r.config.Excludes, r.config.ExcludeMatches, doneCh, errCh)
}

var errs *multierror.Error
Expand Down Expand Up @@ -273,7 +273,7 @@ func (r *Runner) get(prefix *Prefix) (*watch.View, bool) {
// replicate performs replication into the current datacenter from the given
// prefix. This function is designed to be called via a goroutine since it is
// expensive and needs to be parallelized.
func (r *Runner) replicate(prefix *Prefix, excludes []*Exclude, doneCh chan struct{}, errCh chan error) {
func (r *Runner) replicate(prefix *Prefix, excludes []*Exclude, excludematches []*ExcludeMatch, doneCh chan struct{}, errCh chan error) {
// Ensure we are not self-replicating
info, err := r.clients.Consul().Agent().Self()
if err != nil {
Expand Down Expand Up @@ -334,6 +334,22 @@ func (r *Runner) replicate(prefix *Prefix, excludes []*Exclude, doneCh chan stru
}
}

// Ignore if the key falls under an exclude path match
if len(excludematches) > 0 {
excludematched := false
for _, excludematch := range excludematches {
if strings.Contains(pair.Path, excludematch.Source) {
log.Printf("[DEBUG] (runner) key %q contains %q, excluding",
pair.Path, excludematch.Source)
excludematched = true
}
}

if excludematched {
continue
}
}

// Ignore if the modify index is old
if pair.ModifyIndex <= status.LastReplicated {
log.Printf("[DEBUG] (runner) skipping because %q is already "+
Expand Down Expand Up @@ -380,6 +396,7 @@ func (r *Runner) replicate(prefix *Prefix, excludes []*Exclude, doneCh chan stru
}
for _, key := range localKeys {
excluded := false
excludematched := false

// Ignore if the key falls under an excluded prefix
if len(excludes) > 0 {
Expand All @@ -393,7 +410,19 @@ func (r *Runner) replicate(prefix *Prefix, excludes []*Exclude, doneCh chan stru
}
}

if _, ok := usedKeys[key]; !ok && !excluded {
// Ignore if the key falls under an excluded match path
if !excluded && len(excludematches) > 0 {
sourceKey := strings.Replace(key, prefix.Destination, prefix.Source, -1)
for _, excludematch := range excludematches {
if strings.Contains(sourceKey, excludematch.Source) {
log.Printf("[DEBUG] (runner) key %q contains %q, excluding from deletes",
sourceKey, excludematch.Source)
excludematched = true
}
}
}

if _, ok := usedKeys[key]; !ok && !excluded && !excludematched {
if _, err := kv.Delete(key, nil); err != nil {
errCh <- fmt.Errorf("failed to delete %q: %s", key, err)
return
Expand Down
3 changes: 3 additions & 0 deletions runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ func TestNewRunner_initialize(t *testing.T) {
Excludes: []*Exclude{
&Exclude{Source: "3"},
},
ExcludeMatches: []*ExcludeMatch{
&ExcludeMatch{Source: "2"},
},
}

runner, err := NewRunner(config, once)
Expand Down
14 changes: 14 additions & 0 deletions scripts/integration.sh
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ $CONSUL_REPLICATE_BIN \
-consul $ADDRESS_DC2 \
-prefix "global@dc1:backup" \
-exclude "global/$EXCLUDED_KEY" \
-excludematch "excluded_" \
-log-level $LOG_LEVEL &
CONSUL_REPLICATE_PID=$!
sleep 3
Expand All @@ -99,6 +100,7 @@ curl -sLo /dev/null -X PUT $ADDRESS_DC1/v1/kv/global/six -d "six"
sleep 3
curl -sL $ADDRESS_DC2/v1/kv/backup/six | grep -q "c2l4"

echo "##Test Case #1"
echo " Writing a key in DC2"
curl -sLo /dev/null -X PUT $ADDRESS_DC2/v1/kv/backup/$EXCLUDED_KEY/nodelete -d "don't delete"
sleep 3
Expand All @@ -110,6 +112,18 @@ sleep 3
echo " Checking key still exists in DC2"
curl -sL $ADDRESS_DC2/v1/kv/backup/$EXCLUDED_KEY/nodelete | grep -q "ZG9uJ3QgZGVsZXRl"

echo "##Test Case #2"
echo " Writing a key in DC2"
curl -sLo /dev/null -X PUT $ADDRESS_DC2/v1/kv/backup/excluded_key -d "don't delete"
sleep 3

echo " Updating prefix in DC1"
curl -sLo /dev/null -X PUT $ADDRESS_DC1/v1/kv/global/parent_folder/other_folder/anykey -d "test data"
sleep 3

echo " Checking key still exists in DC2"
curl -sL $ADDRESS_DC2/v1/kv/backup/excluded_key | grep -q "ZG9uJ3QgZGVsZXRl"

rm -rf $DATADIR_DC1
rm -rf $DATADIR_DC2

Expand Down