diff --git a/third_party/BUILD b/third_party/BUILD index a6b23dbb6724d..801e43d550120 100644 --- a/third_party/BUILD +++ b/third_party/BUILD @@ -13,6 +13,8 @@ filegroup( name = "all-srcs", srcs = [ ":package-srcs", + "//third_party/forked/etcd221/pkg/fileutil:all-srcs", + "//third_party/forked/etcd221/wal:all-srcs", "//third_party/forked/etcd237/pkg/fileutil:all-srcs", "//third_party/forked/etcd237/wal:all-srcs", "//third_party/forked/golang/expansion:all-srcs", diff --git a/third_party/forked/etcd221/README.md b/third_party/forked/etcd221/README.md new file mode 100644 index 0000000000000..548aee2a53495 --- /dev/null +++ b/third_party/forked/etcd221/README.md @@ -0,0 +1 @@ +Forked from etcd 2.2 release branch to support migration from 3.0 WAL to 2.2 WAL format diff --git a/third_party/forked/etcd221/pkg/fileutil/BUILD b/third_party/forked/etcd221/pkg/fileutil/BUILD new file mode 100644 index 0000000000000..1749bf95c9363 --- /dev/null +++ b/third_party/forked/etcd221/pkg/fileutil/BUILD @@ -0,0 +1,46 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", + "go_test", +) + +go_test( + name = "go_default_test", + srcs = [ + "fileutil_test.go", + "lock_test.go", + "preallocate_test.go", + "purge_test.go", + ], + library = ":go_default_library", + tags = ["automanaged"], +) + +go_library( + name = "go_default_library", + srcs = [ + "fileutil.go", + "lock_unix.go", + "preallocate.go", + "purge.go", + ], + tags = ["automanaged"], + deps = ["//vendor:github.com/coreos/pkg/capnslog"], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], +) diff --git a/third_party/forked/etcd221/pkg/fileutil/fileutil.go b/third_party/forked/etcd221/pkg/fileutil/fileutil.go new file mode 100644 index 0000000000000..b052553c11dd5 --- /dev/null +++ b/third_party/forked/etcd221/pkg/fileutil/fileutil.go @@ -0,0 +1,57 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fileutil + +import ( + "io/ioutil" + "os" + "path" + "sort" + + "github.com/coreos/pkg/capnslog" +) + +const ( + privateFileMode = 0600 +) + +var ( + plog = capnslog.NewPackageLogger("github.com/coreos/etcd/pkg", "fileutil") +) + +// IsDirWriteable checks if dir is writable by writing and removing a file +// to dir. It returns nil if dir is writable. +func IsDirWriteable(dir string) error { + f := path.Join(dir, ".touch") + if err := ioutil.WriteFile(f, []byte(""), privateFileMode); err != nil { + return err + } + return os.Remove(f) +} + +// ReadDir returns the filenames in the given directory in sorted order. +func ReadDir(dirpath string) ([]string, error) { + dir, err := os.Open(dirpath) + if err != nil { + return nil, err + } + defer dir.Close() + names, err := dir.Readdirnames(-1) + if err != nil { + return nil, err + } + sort.Strings(names) + return names, nil +} diff --git a/third_party/forked/etcd221/pkg/fileutil/fileutil_test.go b/third_party/forked/etcd221/pkg/fileutil/fileutil_test.go new file mode 100644 index 0000000000000..e5a9eb24d41f5 --- /dev/null +++ b/third_party/forked/etcd221/pkg/fileutil/fileutil_test.go @@ -0,0 +1,67 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fileutil + +import ( + "io/ioutil" + "os" + "path/filepath" + "reflect" + "testing" +) + +func TestIsDirWriteable(t *testing.T) { + tmpdir, err := ioutil.TempDir("", "") + if err != nil { + t.Fatalf("unexpected ioutil.TempDir error: %v", err) + } + defer os.RemoveAll(tmpdir) + if err := IsDirWriteable(tmpdir); err != nil { + t.Fatalf("unexpected IsDirWriteable error: %v", err) + } + if err := os.Chmod(tmpdir, 0444); err != nil { + t.Fatalf("unexpected os.Chmod error: %v", err) + } + if err := IsDirWriteable(tmpdir); err == nil { + t.Fatalf("expected IsDirWriteable to error") + } +} + +func TestReadDir(t *testing.T) { + tmpdir, err := ioutil.TempDir("", "") + defer os.RemoveAll(tmpdir) + if err != nil { + t.Fatalf("unexpected ioutil.TempDir error: %v", err) + } + files := []string{"def", "abc", "xyz", "ghi"} + for _, f := range files { + var fh *os.File + fh, err = os.Create(filepath.Join(tmpdir, f)) + if err != nil { + t.Fatalf("error creating file: %v", err) + } + if err := fh.Close(); err != nil { + t.Fatalf("error closing file: %v", err) + } + } + fs, err := ReadDir(tmpdir) + if err != nil { + t.Fatalf("error calling ReadDir: %v", err) + } + wfs := []string{"abc", "def", "ghi", "xyz"} + if !reflect.DeepEqual(fs, wfs) { + t.Fatalf("ReadDir: got %v, want %v", fs, wfs) + } +} diff --git a/third_party/forked/etcd221/pkg/fileutil/lock_plan9.go b/third_party/forked/etcd221/pkg/fileutil/lock_plan9.go new file mode 100644 index 0000000000000..311288de03198 --- /dev/null +++ b/third_party/forked/etcd221/pkg/fileutil/lock_plan9.go @@ -0,0 +1,90 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fileutil + +import ( + "errors" + "os" + "syscall" + "time" +) + +var ( + ErrLocked = errors.New("file already locked") +) + +type Lock interface { + Name() string + TryLock() error + Lock() error + Unlock() error + Destroy() error +} + +type lock struct { + fname string + file *os.File +} + +func (l *lock) Name() string { + return l.fname +} + +// TryLock acquires exclusivity on the lock without blocking +func (l *lock) TryLock() error { + err := os.Chmod(l.fname, syscall.DMEXCL|0600) + if err != nil { + return err + } + + f, err := os.Open(l.fname) + if err != nil { + return ErrLocked + } + + l.file = f + return nil +} + +// Lock acquires exclusivity on the lock with blocking +func (l *lock) Lock() error { + err := os.Chmod(l.fname, syscall.DMEXCL|0600) + if err != nil { + return err + } + + for { + f, err := os.Open(l.fname) + if err == nil { + l.file = f + return nil + } + time.Sleep(10 * time.Millisecond) + } +} + +// Unlock unlocks the lock +func (l *lock) Unlock() error { + return l.file.Close() +} + +func (l *lock) Destroy() error { + return nil +} + +func NewLock(file string) (Lock, error) { + l := &lock{fname: file} + return l, nil +} diff --git a/third_party/forked/etcd221/pkg/fileutil/lock_solaris.go b/third_party/forked/etcd221/pkg/fileutil/lock_solaris.go new file mode 100644 index 0000000000000..1929bd1fc8c10 --- /dev/null +++ b/third_party/forked/etcd221/pkg/fileutil/lock_solaris.go @@ -0,0 +1,98 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build solaris + +package fileutil + +import ( + "errors" + "os" + "syscall" +) + +var ( + ErrLocked = errors.New("file already locked") +) + +type Lock interface { + Name() string + TryLock() error + Lock() error + Unlock() error + Destroy() error +} + +type lock struct { + fd int + file *os.File +} + +func (l *lock) Name() string { + return l.file.Name() +} + +// TryLock acquires exclusivity on the lock without blocking +func (l *lock) TryLock() error { + var lock syscall.Flock_t + lock.Start = 0 + lock.Len = 0 + lock.Pid = 0 + lock.Type = syscall.F_WRLCK + lock.Whence = 0 + lock.Pid = 0 + err := syscall.FcntlFlock(uintptr(l.fd), syscall.F_SETLK, &lock) + if err != nil && err == syscall.EAGAIN { + return ErrLocked + } + return err +} + +// Lock acquires exclusivity on the lock without blocking +func (l *lock) Lock() error { + var lock syscall.Flock_t + lock.Start = 0 + lock.Len = 0 + lock.Type = syscall.F_WRLCK + lock.Whence = 0 + lock.Pid = 0 + return syscall.FcntlFlock(uintptr(l.fd), syscall.F_SETLK, &lock) +} + +// Unlock unlocks the lock +func (l *lock) Unlock() error { + var lock syscall.Flock_t + lock.Start = 0 + lock.Len = 0 + lock.Type = syscall.F_UNLCK + lock.Whence = 0 + err := syscall.FcntlFlock(uintptr(l.fd), syscall.F_SETLK, &lock) + if err != nil && err == syscall.EAGAIN { + return ErrLocked + } + return err +} + +func (l *lock) Destroy() error { + return l.file.Close() +} + +func NewLock(file string) (Lock, error) { + f, err := os.OpenFile(file, os.O_WRONLY, 0600) + if err != nil { + return nil, err + } + l := &lock{int(f.Fd()), f} + return l, nil +} diff --git a/third_party/forked/etcd221/pkg/fileutil/lock_test.go b/third_party/forked/etcd221/pkg/fileutil/lock_test.go new file mode 100644 index 0000000000000..cda47deea41d4 --- /dev/null +++ b/third_party/forked/etcd221/pkg/fileutil/lock_test.go @@ -0,0 +1,96 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fileutil + +import ( + "io/ioutil" + "os" + "testing" + "time" +) + +func TestLockAndUnlock(t *testing.T) { + f, err := ioutil.TempFile("", "lock") + if err != nil { + t.Fatal(err) + } + f.Close() + defer func() { + err := os.Remove(f.Name()) + if err != nil { + t.Fatal(err) + } + }() + + // lock the file + l, err := NewLock(f.Name()) + if err != nil { + t.Fatal(err) + } + defer l.Destroy() + err = l.Lock() + if err != nil { + t.Fatal(err) + } + + // try lock a locked file + dupl, err := NewLock(f.Name()) + if err != nil { + t.Fatal(err) + } + err = dupl.TryLock() + if err != ErrLocked { + t.Errorf("err = %v, want %v", err, ErrLocked) + } + + // unlock the file + err = l.Unlock() + if err != nil { + t.Fatal(err) + } + + // try lock the unlocked file + err = dupl.TryLock() + if err != nil { + t.Errorf("err = %v, want %v", err, nil) + } + defer dupl.Destroy() + + // blocking on locked file + locked := make(chan struct{}, 1) + go func() { + l.Lock() + locked <- struct{}{} + }() + + select { + case <-locked: + t.Error("unexpected unblocking") + case <-time.After(10 * time.Millisecond): + } + + // unlock + err = dupl.Unlock() + if err != nil { + t.Fatal(err) + } + + // the previously blocked routine should be unblocked + select { + case <-locked: + case <-time.After(20 * time.Millisecond): + t.Error("unexpected blocking") + } +} diff --git a/third_party/forked/etcd221/pkg/fileutil/lock_unix.go b/third_party/forked/etcd221/pkg/fileutil/lock_unix.go new file mode 100644 index 0000000000000..f6e69cc344dc7 --- /dev/null +++ b/third_party/forked/etcd221/pkg/fileutil/lock_unix.go @@ -0,0 +1,76 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build !windows,!plan9,!solaris + +package fileutil + +import ( + "errors" + "os" + "syscall" +) + +var ( + ErrLocked = errors.New("file already locked") +) + +type Lock interface { + Name() string + TryLock() error + Lock() error + Unlock() error + Destroy() error +} + +type lock struct { + fd int + file *os.File +} + +func (l *lock) Name() string { + return l.file.Name() +} + +// TryLock acquires exclusivity on the lock without blocking +func (l *lock) TryLock() error { + err := syscall.Flock(l.fd, syscall.LOCK_EX|syscall.LOCK_NB) + if err != nil && err == syscall.EWOULDBLOCK { + return ErrLocked + } + return err +} + +// Lock acquires exclusivity on the lock without blocking +func (l *lock) Lock() error { + return syscall.Flock(l.fd, syscall.LOCK_EX) +} + +// Unlock unlocks the lock +func (l *lock) Unlock() error { + return syscall.Flock(l.fd, syscall.LOCK_UN) +} + +func (l *lock) Destroy() error { + return l.file.Close() +} + +func NewLock(file string) (Lock, error) { + f, err := os.Open(file) + if err != nil { + return nil, err + } + l := &lock{int(f.Fd()), f} + return l, nil +} diff --git a/third_party/forked/etcd221/pkg/fileutil/lock_windows.go b/third_party/forked/etcd221/pkg/fileutil/lock_windows.go new file mode 100644 index 0000000000000..a0a928b90ad27 --- /dev/null +++ b/third_party/forked/etcd221/pkg/fileutil/lock_windows.go @@ -0,0 +1,71 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build windows + +package fileutil + +import ( + "errors" + "os" +) + +var ( + ErrLocked = errors.New("file already locked") +) + +type Lock interface { + Name() string + TryLock() error + Lock() error + Unlock() error + Destroy() error +} + +type lock struct { + fd int + file *os.File +} + +func (l *lock) Name() string { + return l.file.Name() +} + +// TryLock acquires exclusivity on the lock without blocking +func (l *lock) TryLock() error { + return nil +} + +// Lock acquires exclusivity on the lock without blocking +func (l *lock) Lock() error { + return nil +} + +// Unlock unlocks the lock +func (l *lock) Unlock() error { + return nil +} + +func (l *lock) Destroy() error { + return l.file.Close() +} + +func NewLock(file string) (Lock, error) { + f, err := os.Open(file) + if err != nil { + return nil, err + } + l := &lock{int(f.Fd()), f} + return l, nil +} diff --git a/third_party/forked/etcd221/pkg/fileutil/perallocate_unsupported.go b/third_party/forked/etcd221/pkg/fileutil/perallocate_unsupported.go new file mode 100644 index 0000000000000..c1a952bb79686 --- /dev/null +++ b/third_party/forked/etcd221/pkg/fileutil/perallocate_unsupported.go @@ -0,0 +1,28 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build !linux + +package fileutil + +import "os" + +// Preallocate tries to allocate the space for given +// file. This operation is only supported on linux by a +// few filesystems (btrfs, ext4, etc.). +// If the operation is unsupported, no error will be returned. +// Otherwise, the error encountered will be returned. +func Preallocate(f *os.File, sizeInBytes int) error { + return nil +} diff --git a/third_party/forked/etcd221/pkg/fileutil/preallocate.go b/third_party/forked/etcd221/pkg/fileutil/preallocate.go new file mode 100644 index 0000000000000..c4bd4f4c815ca --- /dev/null +++ b/third_party/forked/etcd221/pkg/fileutil/preallocate.go @@ -0,0 +1,42 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build linux + +package fileutil + +import ( + "os" + "syscall" +) + +// Preallocate tries to allocate the space for given +// file. This operation is only supported on linux by a +// few filesystems (btrfs, ext4, etc.). +// If the operation is unsupported, no error will be returned. +// Otherwise, the error encountered will be returned. +func Preallocate(f *os.File, sizeInBytes int) error { + // use mode = 1 to keep size + // see FALLOC_FL_KEEP_SIZE + err := syscall.Fallocate(int(f.Fd()), 1, 0, int64(sizeInBytes)) + if err != nil { + errno, ok := err.(syscall.Errno) + // treat not support as nil error + if ok && errno == syscall.ENOTSUP { + return nil + } + return err + } + return nil +} diff --git a/third_party/forked/etcd221/pkg/fileutil/preallocate_test.go b/third_party/forked/etcd221/pkg/fileutil/preallocate_test.go new file mode 100644 index 0000000000000..d5f2a71f304b4 --- /dev/null +++ b/third_party/forked/etcd221/pkg/fileutil/preallocate_test.go @@ -0,0 +1,53 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fileutil + +import ( + "io/ioutil" + "os" + "runtime" + "testing" +) + +func TestPreallocate(t *testing.T) { + if runtime.GOOS != "linux" { + t.Skipf("skip testPreallocate, OS = %s", runtime.GOOS) + } + + p, err := ioutil.TempDir(os.TempDir(), "preallocateTest") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(p) + + f, err := ioutil.TempFile(p, "") + if err != nil { + t.Fatal(err) + } + + size := 64 * 1000 + err = Preallocate(f, size) + if err != nil { + t.Fatal(err) + } + + stat, err := f.Stat() + if err != nil { + t.Fatal(err) + } + if stat.Size() != 0 { + t.Errorf("size = %d, want %d", stat.Size(), 0) + } +} diff --git a/third_party/forked/etcd221/pkg/fileutil/purge.go b/third_party/forked/etcd221/pkg/fileutil/purge.go new file mode 100644 index 0000000000000..375aa97197446 --- /dev/null +++ b/third_party/forked/etcd221/pkg/fileutil/purge.go @@ -0,0 +1,80 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fileutil + +import ( + "os" + "path" + "sort" + "strings" + "time" +) + +func PurgeFile(dirname string, suffix string, max uint, interval time.Duration, stop <-chan struct{}) <-chan error { + errC := make(chan error, 1) + go func() { + for { + fnames, err := ReadDir(dirname) + if err != nil { + errC <- err + return + } + newfnames := make([]string, 0) + for _, fname := range fnames { + if strings.HasSuffix(fname, suffix) { + newfnames = append(newfnames, fname) + } + } + sort.Strings(newfnames) + for len(newfnames) > int(max) { + f := path.Join(dirname, newfnames[0]) + l, err := NewLock(f) + if err != nil { + errC <- err + return + } + err = l.TryLock() + if err != nil { + break + } + err = os.Remove(f) + if err != nil { + errC <- err + return + } + err = l.Unlock() + if err != nil { + plog.Errorf("error unlocking %s when purging file (%v)", l.Name(), err) + errC <- err + return + } + err = l.Destroy() + if err != nil { + plog.Errorf("error destroying lock %s when purging file (%v)", l.Name(), err) + errC <- err + return + } + plog.Infof("purged file %s successfully", f) + newfnames = newfnames[1:] + } + select { + case <-time.After(interval): + case <-stop: + return + } + } + }() + return errC +} diff --git a/third_party/forked/etcd221/pkg/fileutil/purge_test.go b/third_party/forked/etcd221/pkg/fileutil/purge_test.go new file mode 100644 index 0000000000000..0b11b23a579bf --- /dev/null +++ b/third_party/forked/etcd221/pkg/fileutil/purge_test.go @@ -0,0 +1,132 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fileutil + +import ( + "fmt" + "io/ioutil" + "os" + "path" + "reflect" + "testing" + "time" +) + +func TestPurgeFile(t *testing.T) { + dir, err := ioutil.TempDir("", "purgefile") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(dir) + + for i := 0; i < 5; i++ { + _, err = os.Create(path.Join(dir, fmt.Sprintf("%d.test", i))) + if err != nil { + t.Fatal(err) + } + } + + stop := make(chan struct{}) + errch := PurgeFile(dir, "test", 3, time.Millisecond, stop) + for i := 5; i < 10; i++ { + _, err = os.Create(path.Join(dir, fmt.Sprintf("%d.test", i))) + if err != nil { + t.Fatal(err) + } + time.Sleep(10 * time.Millisecond) + } + fnames, err := ReadDir(dir) + if err != nil { + t.Fatal(err) + } + wnames := []string{"7.test", "8.test", "9.test"} + if !reflect.DeepEqual(fnames, wnames) { + t.Errorf("filenames = %v, want %v", fnames, wnames) + } + select { + case err := <-errch: + t.Errorf("unexpected purge error %v", err) + case <-time.After(time.Millisecond): + } + close(stop) +} + +func TestPurgeFileHoldingLock(t *testing.T) { + dir, err := ioutil.TempDir("", "purgefile") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(dir) + + for i := 0; i < 10; i++ { + _, err = os.Create(path.Join(dir, fmt.Sprintf("%d.test", i))) + if err != nil { + t.Fatal(err) + } + } + + // create a purge barrier at 5 + l, err := NewLock(path.Join(dir, fmt.Sprintf("%d.test", 5))) + err = l.Lock() + if err != nil { + t.Fatal(err) + } + + stop := make(chan struct{}) + errch := PurgeFile(dir, "test", 3, time.Millisecond, stop) + time.Sleep(20 * time.Millisecond) + + fnames, err := ReadDir(dir) + if err != nil { + t.Fatal(err) + } + wnames := []string{"5.test", "6.test", "7.test", "8.test", "9.test"} + if !reflect.DeepEqual(fnames, wnames) { + t.Errorf("filenames = %v, want %v", fnames, wnames) + } + select { + case err := <-errch: + t.Errorf("unexpected purge error %v", err) + case <-time.After(time.Millisecond): + } + + // remove the purge barrier + err = l.Unlock() + if err != nil { + t.Fatal(err) + } + err = l.Destroy() + if err != nil { + t.Fatal(err) + } + + time.Sleep(20 * time.Millisecond) + + fnames, err = ReadDir(dir) + if err != nil { + t.Fatal(err) + } + wnames = []string{"7.test", "8.test", "9.test"} + if !reflect.DeepEqual(fnames, wnames) { + t.Errorf("filenames = %v, want %v", fnames, wnames) + } + select { + case err := <-errch: + t.Errorf("unexpected purge error %v", err) + case <-time.After(time.Millisecond): + } + + close(stop) +} diff --git a/third_party/forked/etcd221/wal/BUILD b/third_party/forked/etcd221/wal/BUILD new file mode 100644 index 0000000000000..797bc80cd03f8 --- /dev/null +++ b/third_party/forked/etcd221/wal/BUILD @@ -0,0 +1,49 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", +) + +go_library( + name = "go_default_library", + srcs = [ + "decoder.go", + "doc.go", + "encoder.go", + "metrics.go", + "multi_readcloser.go", + "repair.go", + "util.go", + "wal.go", + ], + tags = ["automanaged"], + deps = [ + "//third_party/forked/etcd221/pkg/fileutil:go_default_library", + "//vendor:github.com/coreos/etcd/pkg/crc", + "//vendor:github.com/coreos/etcd/pkg/pbutil", + "//vendor:github.com/coreos/etcd/raft", + "//vendor:github.com/coreos/etcd/raft/raftpb", + "//vendor:github.com/coreos/etcd/wal/walpb", + "//vendor:github.com/coreos/pkg/capnslog", + "//vendor:github.com/prometheus/client_golang/prometheus", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [ + ":package-srcs", + "//third_party/forked/etcd221/wal/walpb:all-srcs", + ], + tags = ["automanaged"], +) diff --git a/third_party/forked/etcd221/wal/decoder.go b/third_party/forked/etcd221/wal/decoder.go new file mode 100644 index 0000000000000..f75c919fba6e2 --- /dev/null +++ b/third_party/forked/etcd221/wal/decoder.go @@ -0,0 +1,103 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wal + +import ( + "bufio" + "encoding/binary" + "hash" + "io" + "sync" + + "github.com/coreos/etcd/pkg/crc" + "github.com/coreos/etcd/pkg/pbutil" + "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/wal/walpb" +) + +type decoder struct { + mu sync.Mutex + br *bufio.Reader + + c io.Closer + crc hash.Hash32 +} + +func newDecoder(rc io.ReadCloser) *decoder { + return &decoder{ + br: bufio.NewReader(rc), + c: rc, + crc: crc.New(0, crcTable), + } +} + +func (d *decoder) decode(rec *walpb.Record) error { + d.mu.Lock() + defer d.mu.Unlock() + + rec.Reset() + l, err := readInt64(d.br) + if err != nil { + return err + } + data := make([]byte, l) + if _, err = io.ReadFull(d.br, data); err != nil { + // ReadFull returns io.EOF only if no bytes were read + // the decoder should treat this as an ErrUnexpectedEOF instead. + if err == io.EOF { + err = io.ErrUnexpectedEOF + } + return err + } + if err := rec.Unmarshal(data); err != nil { + return err + } + // skip crc checking if the record type is crcType + if rec.Type == crcType { + return nil + } + d.crc.Write(rec.Data) + return rec.Validate(d.crc.Sum32()) +} + +func (d *decoder) updateCRC(prevCrc uint32) { + d.crc = crc.New(prevCrc, crcTable) +} + +func (d *decoder) lastCRC() uint32 { + return d.crc.Sum32() +} + +func (d *decoder) close() error { + return d.c.Close() +} + +func mustUnmarshalEntry(d []byte) raftpb.Entry { + var e raftpb.Entry + pbutil.MustUnmarshal(&e, d) + return e +} + +func mustUnmarshalState(d []byte) raftpb.HardState { + var s raftpb.HardState + pbutil.MustUnmarshal(&s, d) + return s +} + +func readInt64(r io.Reader) (int64, error) { + var n int64 + err := binary.Read(r, binary.LittleEndian, &n) + return n, err +} diff --git a/third_party/forked/etcd221/wal/doc.go b/third_party/forked/etcd221/wal/doc.go new file mode 100644 index 0000000000000..769b522f040b5 --- /dev/null +++ b/third_party/forked/etcd221/wal/doc.go @@ -0,0 +1,68 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/* +Package wal provides an implementation of a write ahead log that is used by +etcd. + +A WAL is created at a particular directory and is made up of a number of +segmented WAL files. Inside of each file the raft state and entries are appended +to it with the Save method: + + metadata := []byte{} + w, err := wal.Create("/var/lib/etcd", metadata) + ... + err := w.Save(s, ents) + +After saving an raft snapshot to disk, SaveSnapshot method should be called to +record it. So WAL can match with the saved snapshot when restarting. + + err := w.SaveSnapshot(walpb.Snapshot{Index: 10, Term: 2}) + +When a user has finished using a WAL it must be closed: + + w.Close() + +WAL files are placed inside of the directory in the following format: +$seq-$index.wal + +The first WAL file to be created will be 0000000000000000-0000000000000000.wal +indicating an initial sequence of 0 and an initial raft index of 0. The first +entry written to WAL MUST have raft index 0. + +WAL will cuts its current wal files if its size exceeds 8MB. This will increment an internal +sequence number and cause a new file to be created. If the last raft index saved +was 0x20 and this is the first time cut has been called on this WAL then the sequence will +increment from 0x0 to 0x1. The new file will be: 0000000000000001-0000000000000021.wal. +If a second cut issues 0x10 entries with incremental index later then the file will be called: +0000000000000002-0000000000000031.wal. + +At a later time a WAL can be opened at a particular snapshot. If there is no +snapshot, an empty snapshot should be passed in. + + w, err := wal.Open("/var/lib/etcd", walpb.Snapshot{Index: 10, Term: 2}) + ... + +The snapshot must have been written to the WAL. + +Additional items cannot be Saved to this WAL until all of the items from the given +snapshot to the end of the WAL are read first: + + metadata, state, ents, err := w.ReadAll() + +This will give you the metadata, the last raft.State and the slice of +raft.Entry items in the log. + +*/ +package wal diff --git a/third_party/forked/etcd221/wal/encoder.go b/third_party/forked/etcd221/wal/encoder.go new file mode 100644 index 0000000000000..959f90ad4d686 --- /dev/null +++ b/third_party/forked/etcd221/wal/encoder.go @@ -0,0 +1,89 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wal + +import ( + "bufio" + "encoding/binary" + "hash" + "io" + "sync" + + "github.com/coreos/etcd/pkg/crc" + "github.com/coreos/etcd/wal/walpb" +) + +type encoder struct { + mu sync.Mutex + bw *bufio.Writer + + crc hash.Hash32 + buf []byte + uint64buf []byte +} + +func newEncoder(w io.Writer, prevCrc uint32) *encoder { + return &encoder{ + bw: bufio.NewWriter(w), + crc: crc.New(prevCrc, crcTable), + // 1MB buffer + buf: make([]byte, 1024*1024), + uint64buf: make([]byte, 8), + } +} + +func (e *encoder) encode(rec *walpb.Record) error { + e.mu.Lock() + defer e.mu.Unlock() + + e.crc.Write(rec.Data) + rec.Crc = e.crc.Sum32() + var ( + data []byte + err error + n int + ) + + if rec.Size() > len(e.buf) { + data, err = rec.Marshal() + if err != nil { + return err + } + } else { + n, err = rec.MarshalTo(e.buf) + if err != nil { + return err + } + data = e.buf[:n] + } + if err := writeInt64(e.bw, int64(len(data)), e.uint64buf); err != nil { + return err + } + _, err = e.bw.Write(data) + return err +} + +func (e *encoder) flush() error { + e.mu.Lock() + defer e.mu.Unlock() + return e.bw.Flush() +} + +func writeInt64(w io.Writer, n int64, buf []byte) error { + // http://golang.org/src/encoding/binary/binary.go + binary.LittleEndian.PutUint64(buf, uint64(n)) + _, err := w.Write(buf) + return err +} diff --git a/third_party/forked/etcd221/wal/metrics.go b/third_party/forked/etcd221/wal/metrics.go new file mode 100644 index 0000000000000..578ee91c86e40 --- /dev/null +++ b/third_party/forked/etcd221/wal/metrics.go @@ -0,0 +1,37 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wal + +import "github.com/prometheus/client_golang/prometheus" + +var ( + syncDurations = prometheus.NewSummary(prometheus.SummaryOpts{ + Namespace: "etcd", + Subsystem: "wal", + Name: "fsync_durations_microseconds", + Help: "The latency distributions of fsync called by wal.", + }) + lastIndexSaved = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "etcd", + Subsystem: "wal", + Name: "last_index_saved", + Help: "The index of the last entry saved by wal.", + }) +) + +func init() { + prometheus.MustRegister(syncDurations) + prometheus.MustRegister(lastIndexSaved) +} diff --git a/third_party/forked/etcd221/wal/multi_readcloser.go b/third_party/forked/etcd221/wal/multi_readcloser.go new file mode 100644 index 0000000000000..513c6d17d941b --- /dev/null +++ b/third_party/forked/etcd221/wal/multi_readcloser.go @@ -0,0 +1,45 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wal + +import "io" + +type multiReadCloser struct { + closers []io.Closer + reader io.Reader +} + +func (mc *multiReadCloser) Close() error { + var err error + for i := range mc.closers { + err = mc.closers[i].Close() + } + return err +} + +func (mc *multiReadCloser) Read(p []byte) (int, error) { + return mc.reader.Read(p) +} + +func MultiReadCloser(readClosers ...io.ReadCloser) io.ReadCloser { + cs := make([]io.Closer, len(readClosers)) + rs := make([]io.Reader, len(readClosers)) + for i := range readClosers { + cs[i] = readClosers[i] + rs[i] = readClosers[i] + } + r := io.MultiReader(rs...) + return &multiReadCloser{cs, r} +} diff --git a/third_party/forked/etcd221/wal/repair.go b/third_party/forked/etcd221/wal/repair.go new file mode 100644 index 0000000000000..55d6bcfc7a868 --- /dev/null +++ b/third_party/forked/etcd221/wal/repair.go @@ -0,0 +1,107 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wal + +import ( + "io" + "os" + "path" + + "k8s.io/kubernetes/third_party/forked/etcd221/pkg/fileutil" + + "github.com/coreos/etcd/wal/walpb" +) + +// Repair tries to repair the unexpectedEOF error in the +// last wal file by truncating. +func Repair(dirpath string) bool { + f, err := openLast(dirpath) + if err != nil { + return false + } + defer f.Close() + + n := 0 + rec := &walpb.Record{} + + decoder := newDecoder(f) + defer decoder.close() + for { + err := decoder.decode(rec) + switch err { + case nil: + n += 8 + rec.Size() + // update crc of the decoder when necessary + switch rec.Type { + case crcType: + crc := decoder.crc.Sum32() + // current crc of decoder must match the crc of the record. + // do no need to match 0 crc, since the decoder is a new one at this case. + if crc != 0 && rec.Validate(crc) != nil { + return false + } + decoder.updateCRC(rec.Crc) + } + continue + case io.EOF: + return true + case io.ErrUnexpectedEOF: + plog.Noticef("repairing %v", f.Name()) + bf, bferr := os.Create(f.Name() + ".broken") + if bferr != nil { + plog.Errorf("could not repair %v, failed to create backup file", f.Name()) + return false + } + defer bf.Close() + + if _, err = f.Seek(0, os.SEEK_SET); err != nil { + plog.Errorf("could not repair %v, failed to read file", f.Name()) + return false + } + + if _, err = io.Copy(bf, f); err != nil { + plog.Errorf("could not repair %v, failed to copy file", f.Name()) + return false + } + + if err = f.Truncate(int64(n)); err != nil { + plog.Errorf("could not repair %v, failed to truncate file", f.Name()) + return false + } + if err = f.Sync(); err != nil { + plog.Errorf("could not repair %v, failed to sync file", f.Name()) + return false + } + return true + default: + plog.Errorf("could not repair error (%v)", err) + return false + } + } +} + +// openLast opens the last wal file for read and write. +func openLast(dirpath string) (*os.File, error) { + names, err := fileutil.ReadDir(dirpath) + if err != nil { + return nil, err + } + names = checkWalNames(names) + if len(names) == 0 { + return nil, ErrFileNotFound + } + last := path.Join(dirpath, names[len(names)-1]) + return os.OpenFile(last, os.O_RDWR, 0) +} diff --git a/third_party/forked/etcd221/wal/util.go b/third_party/forked/etcd221/wal/util.go new file mode 100644 index 0000000000000..b40efc8b1220b --- /dev/null +++ b/third_party/forked/etcd221/wal/util.go @@ -0,0 +1,93 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wal + +import ( + "errors" + "fmt" + "strings" + + "k8s.io/kubernetes/third_party/forked/etcd221/pkg/fileutil" +) + +var ( + badWalName = errors.New("bad wal name") +) + +func Exist(dirpath string) bool { + names, err := fileutil.ReadDir(dirpath) + if err != nil { + return false + } + return len(names) != 0 +} + +// searchIndex returns the last array index of names whose raft index section is +// equal to or smaller than the given index. +// The given names MUST be sorted. +func searchIndex(names []string, index uint64) (int, bool) { + for i := len(names) - 1; i >= 0; i-- { + name := names[i] + _, curIndex, err := parseWalName(name) + if err != nil { + plog.Panicf("parse correct name should never fail: %v", err) + } + if index >= curIndex { + return i, true + } + } + return -1, false +} + +// names should have been sorted based on sequence number. +// isValidSeq checks whether seq increases continuously. +func isValidSeq(names []string) bool { + var lastSeq uint64 + for _, name := range names { + curSeq, _, err := parseWalName(name) + if err != nil { + plog.Panicf("parse correct name should never fail: %v", err) + } + if lastSeq != 0 && lastSeq != curSeq-1 { + return false + } + lastSeq = curSeq + } + return true +} + +func checkWalNames(names []string) []string { + wnames := make([]string, 0) + for _, name := range names { + if _, _, err := parseWalName(name); err != nil { + plog.Warningf("ignored file %v in wal", name) + continue + } + wnames = append(wnames, name) + } + return wnames +} + +func parseWalName(str string) (seq, index uint64, err error) { + if !strings.HasSuffix(str, ".wal") { + return 0, 0, badWalName + } + _, err = fmt.Sscanf(str, "%016x-%016x.wal", &seq, &index) + return seq, index, err +} + +func walName(seq, index uint64) string { + return fmt.Sprintf("%016x-%016x.wal", seq, index) +} diff --git a/third_party/forked/etcd221/wal/wal.go b/third_party/forked/etcd221/wal/wal.go new file mode 100644 index 0000000000000..a39ef52b44ec6 --- /dev/null +++ b/third_party/forked/etcd221/wal/wal.go @@ -0,0 +1,548 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wal + +import ( + "errors" + "fmt" + "hash/crc32" + "io" + "os" + "path" + "reflect" + "sync" + "time" + + "k8s.io/kubernetes/third_party/forked/etcd221/pkg/fileutil" + + "github.com/coreos/etcd/pkg/pbutil" + "github.com/coreos/etcd/raft" + "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/wal/walpb" + "github.com/coreos/pkg/capnslog" +) + +const ( + metadataType int64 = iota + 1 + entryType + stateType + crcType + snapshotType + + // the owner can make/remove files inside the directory + privateDirMode = 0700 + + // the expected size of each wal segment file. + // the actual size might be bigger than it. + segmentSizeBytes = 64 * 1000 * 1000 // 64MB +) + +var ( + plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "wal") + + ErrMetadataConflict = errors.New("wal: conflicting metadata found") + ErrFileNotFound = errors.New("wal: file not found") + ErrCRCMismatch = errors.New("wal: crc mismatch") + ErrSnapshotMismatch = errors.New("wal: snapshot mismatch") + ErrSnapshotNotFound = errors.New("wal: snapshot not found") + crcTable = crc32.MakeTable(crc32.Castagnoli) +) + +// WAL is a logical repersentation of the stable storage. +// WAL is either in read mode or append mode but not both. +// A newly created WAL is in append mode, and ready for appending records. +// A just opened WAL is in read mode, and ready for reading records. +// The WAL will be ready for appending after reading out all the previous records. +type WAL struct { + dir string // the living directory of the underlay files + metadata []byte // metadata recorded at the head of each WAL + state raftpb.HardState // hardstate recorded at the head of WAL + + start walpb.Snapshot // snapshot to start reading + decoder *decoder // decoder to decode records + + mu sync.Mutex + f *os.File // underlay file opened for appending, sync + seq uint64 // sequence of the wal file currently used for writes + enti uint64 // index of the last entry saved to the wal + encoder *encoder // encoder to encode records + + locks []fileutil.Lock // the file locks the WAL is holding (the name is increasing) +} + +// Create creates a WAL ready for appending records. The given metadata is +// recorded at the head of each WAL file, and can be retrieved with ReadAll. +func Create(dirpath string, metadata []byte) (*WAL, error) { + if Exist(dirpath) { + return nil, os.ErrExist + } + + if err := os.MkdirAll(dirpath, privateDirMode); err != nil { + return nil, err + } + + p := path.Join(dirpath, walName(0, 0)) + f, err := os.OpenFile(p, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600) + if err != nil { + return nil, err + } + l, err := fileutil.NewLock(f.Name()) + if err != nil { + return nil, err + } + err = l.Lock() + if err != nil { + return nil, err + } + + w := &WAL{ + dir: dirpath, + metadata: metadata, + seq: 0, + f: f, + encoder: newEncoder(f, 0), + } + w.locks = append(w.locks, l) + if err := w.saveCrc(0); err != nil { + return nil, err + } + if err := w.encoder.encode(&walpb.Record{Type: metadataType, Data: metadata}); err != nil { + return nil, err + } + if err = w.SaveSnapshot(walpb.Snapshot{}); err != nil { + return nil, err + } + return w, nil +} + +// Open opens the WAL at the given snap. +// The snap SHOULD have been previously saved to the WAL, or the following +// ReadAll will fail. +// The returned WAL is ready to read and the first record will be the one after +// the given snap. The WAL cannot be appended to before reading out all of its +// previous records. +func Open(dirpath string, snap walpb.Snapshot) (*WAL, error) { + return openAtIndex(dirpath, snap, true) +} + +// OpenForRead only opens the wal files for read. +// Write on a read only wal panics. +func OpenForRead(dirpath string, snap walpb.Snapshot) (*WAL, error) { + return openAtIndex(dirpath, snap, false) +} + +func openAtIndex(dirpath string, snap walpb.Snapshot, write bool) (*WAL, error) { + names, err := fileutil.ReadDir(dirpath) + if err != nil { + return nil, err + } + names = checkWalNames(names) + if len(names) == 0 { + return nil, ErrFileNotFound + } + + nameIndex, ok := searchIndex(names, snap.Index) + if !ok || !isValidSeq(names[nameIndex:]) { + return nil, ErrFileNotFound + } + + // open the wal files for reading + rcs := make([]io.ReadCloser, 0) + ls := make([]fileutil.Lock, 0) + for _, name := range names[nameIndex:] { + f, err := os.Open(path.Join(dirpath, name)) + if err != nil { + return nil, err + } + l, err := fileutil.NewLock(f.Name()) + if err != nil { + return nil, err + } + err = l.TryLock() + if err != nil { + if write { + return nil, err + } + } + rcs = append(rcs, f) + ls = append(ls, l) + } + rc := MultiReadCloser(rcs...) + + // create a WAL ready for reading + w := &WAL{ + dir: dirpath, + start: snap, + decoder: newDecoder(rc), + locks: ls, + } + + if write { + // open the lastest wal file for appending + seq, _, err := parseWalName(names[len(names)-1]) + if err != nil { + rc.Close() + return nil, err + } + last := path.Join(dirpath, names[len(names)-1]) + + f, err := os.OpenFile(last, os.O_WRONLY|os.O_APPEND, 0) + if err != nil { + rc.Close() + return nil, err + } + err = fileutil.Preallocate(f, segmentSizeBytes) + if err != nil { + rc.Close() + plog.Errorf("failed to allocate space when creating new wal file (%v)", err) + return nil, err + } + + w.f = f + w.seq = seq + } + + return w, nil +} + +// ReadAll reads out records of the current WAL. +// If opened in write mode, it must read out all records until EOF. Or an error +// will be returned. +// If opened in read mode, it will try to read all records if possible. +// If it cannot read out the expected snap, it will return ErrSnapshotNotFound. +// If loaded snap doesn't match with the expected one, it will return +// all the records and error ErrSnapshotMismatch. +// TODO: detect not-last-snap error. +// TODO: maybe loose the checking of match. +// After ReadAll, the WAL will be ready for appending new records. +func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.Entry, err error) { + w.mu.Lock() + defer w.mu.Unlock() + + rec := &walpb.Record{} + decoder := w.decoder + + var match bool + for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) { + switch rec.Type { + case entryType: + e := mustUnmarshalEntry(rec.Data) + if e.Index > w.start.Index { + ents = append(ents[:e.Index-w.start.Index-1], e) + } + w.enti = e.Index + case stateType: + state = mustUnmarshalState(rec.Data) + case metadataType: + if metadata != nil && !reflect.DeepEqual(metadata, rec.Data) { + state.Reset() + return nil, state, nil, ErrMetadataConflict + } + metadata = rec.Data + case crcType: + crc := decoder.crc.Sum32() + // current crc of decoder must match the crc of the record. + // do no need to match 0 crc, since the decoder is a new one at this case. + if crc != 0 && rec.Validate(crc) != nil { + state.Reset() + return nil, state, nil, ErrCRCMismatch + } + decoder.updateCRC(rec.Crc) + case snapshotType: + var snap walpb.Snapshot + pbutil.MustUnmarshal(&snap, rec.Data) + if snap.Index == w.start.Index { + if snap.Term != w.start.Term { + state.Reset() + return nil, state, nil, ErrSnapshotMismatch + } + match = true + } + default: + state.Reset() + return nil, state, nil, fmt.Errorf("unexpected block type %d", rec.Type) + } + } + + switch w.f { + case nil: + // We do not have to read out all entries in read mode. + // The last record maybe a partial written one, so + // ErrunexpectedEOF might be returned. + if err != io.EOF && err != io.ErrUnexpectedEOF { + state.Reset() + return nil, state, nil, err + } + default: + // We must read all of the entries if WAL is opened in write mode. + if err != io.EOF { + state.Reset() + return nil, state, nil, err + } + } + + err = nil + if !match { + err = ErrSnapshotNotFound + } + + // close decoder, disable reading + w.decoder.close() + w.start = walpb.Snapshot{} + + w.metadata = metadata + + if w.f != nil { + // create encoder (chain crc with the decoder), enable appending + w.encoder = newEncoder(w.f, w.decoder.lastCRC()) + w.decoder = nil + lastIndexSaved.Set(float64(w.enti)) + } + + return metadata, state, ents, err +} + +// cut closes current file written and creates a new one ready to append. +// cut first creates a temp wal file and writes necessary headers into it. +// Then cut atomtically rename temp wal file to a wal file. +func (w *WAL) cut() error { + // close old wal file + if err := w.sync(); err != nil { + return err + } + if err := w.f.Close(); err != nil { + return err + } + + fpath := path.Join(w.dir, walName(w.seq+1, w.enti+1)) + ftpath := fpath + ".tmp" + + // create a temp wal file with name sequence + 1, or tuncate the existing one + ft, err := os.OpenFile(ftpath, os.O_WRONLY|os.O_APPEND|os.O_CREATE|os.O_TRUNC, 0600) + if err != nil { + return err + } + + // update writer and save the previous crc + w.f = ft + prevCrc := w.encoder.crc.Sum32() + w.encoder = newEncoder(w.f, prevCrc) + if err := w.saveCrc(prevCrc); err != nil { + return err + } + if err := w.encoder.encode(&walpb.Record{Type: metadataType, Data: w.metadata}); err != nil { + return err + } + if err := w.saveState(&w.state); err != nil { + return err + } + // close temp wal file + if err := w.sync(); err != nil { + return err + } + if err := w.f.Close(); err != nil { + return err + } + + // atomically move temp wal file to wal file + if err := os.Rename(ftpath, fpath); err != nil { + return err + } + + // open the wal file and update writer again + f, err := os.OpenFile(fpath, os.O_WRONLY|os.O_APPEND, 0600) + if err != nil { + return err + } + err = fileutil.Preallocate(f, segmentSizeBytes) + if err != nil { + plog.Errorf("failed to allocate space when creating new wal file (%v)", err) + return err + } + + w.f = f + prevCrc = w.encoder.crc.Sum32() + w.encoder = newEncoder(w.f, prevCrc) + + // lock the new wal file + l, err := fileutil.NewLock(f.Name()) + if err != nil { + return err + } + + err = l.Lock() + if err != nil { + return err + } + w.locks = append(w.locks, l) + + // increase the wal seq + w.seq++ + + plog.Infof("segmented wal file %v is created", fpath) + return nil +} + +func (w *WAL) sync() error { + if w.encoder != nil { + if err := w.encoder.flush(); err != nil { + return err + } + } + start := time.Now() + err := w.f.Sync() + syncDurations.Observe(float64(time.Since(start).Nanoseconds() / int64(time.Microsecond))) + return err +} + +// ReleaseLockTo releases the locks, which has smaller index than the given index +// except the largest one among them. +// For example, if WAL is holding lock 1,2,3,4,5,6, ReleaseLockTo(4) will release +// lock 1,2 but keep 3. ReleaseLockTo(5) will release 1,2,3 but keep 4. +func (w *WAL) ReleaseLockTo(index uint64) error { + w.mu.Lock() + defer w.mu.Unlock() + + var smaller int + found := false + + for i, l := range w.locks { + _, lockIndex, err := parseWalName(path.Base(l.Name())) + if err != nil { + return err + } + if lockIndex >= index { + smaller = i - 1 + found = true + break + } + } + + // if no lock index is greater than the release index, we can + // release lock upto the last one(excluding). + if !found && len(w.locks) != 0 { + smaller = len(w.locks) - 1 + } + + if smaller <= 0 { + return nil + } + + for i := 0; i < smaller; i++ { + w.locks[i].Unlock() + w.locks[i].Destroy() + } + w.locks = w.locks[smaller:] + + return nil +} + +func (w *WAL) Close() error { + w.mu.Lock() + defer w.mu.Unlock() + + if w.f != nil { + if err := w.sync(); err != nil { + return err + } + if err := w.f.Close(); err != nil { + return err + } + } + for _, l := range w.locks { + err := l.Unlock() + if err != nil { + plog.Errorf("failed to unlock during closing wal: %s", err) + } + err = l.Destroy() + if err != nil { + plog.Errorf("failed to destroy lock during closing wal: %s", err) + } + } + return nil +} + +func (w *WAL) saveEntry(e *raftpb.Entry) error { + // TODO: add MustMarshalTo to reduce one allocation. + b := pbutil.MustMarshal(e) + rec := &walpb.Record{Type: entryType, Data: b} + if err := w.encoder.encode(rec); err != nil { + return err + } + w.enti = e.Index + lastIndexSaved.Set(float64(w.enti)) + return nil +} + +func (w *WAL) saveState(s *raftpb.HardState) error { + if raft.IsEmptyHardState(*s) { + return nil + } + w.state = *s + b := pbutil.MustMarshal(s) + rec := &walpb.Record{Type: stateType, Data: b} + return w.encoder.encode(rec) +} + +func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error { + w.mu.Lock() + defer w.mu.Unlock() + + // short cut, do not call sync + if raft.IsEmptyHardState(st) && len(ents) == 0 { + return nil + } + + // TODO(xiangli): no more reference operator + for i := range ents { + if err := w.saveEntry(&ents[i]); err != nil { + return err + } + } + if err := w.saveState(&st); err != nil { + return err + } + + fstat, err := w.f.Stat() + if err != nil { + return err + } + if fstat.Size() < segmentSizeBytes { + return w.sync() + } + // TODO: add a test for this code path when refactoring the tests + return w.cut() +} + +func (w *WAL) SaveSnapshot(e walpb.Snapshot) error { + w.mu.Lock() + defer w.mu.Unlock() + + b := pbutil.MustMarshal(&e) + rec := &walpb.Record{Type: snapshotType, Data: b} + if err := w.encoder.encode(rec); err != nil { + return err + } + // update enti only when snapshot is ahead of last index + if w.enti < e.Index { + w.enti = e.Index + } + lastIndexSaved.Set(float64(w.enti)) + return w.sync() +} + +func (w *WAL) saveCrc(prevCrc uint32) error { + return w.encoder.encode(&walpb.Record{Type: crcType, Crc: prevCrc}) +} diff --git a/third_party/forked/etcd221/wal/walpb/BUILD b/third_party/forked/etcd221/wal/walpb/BUILD new file mode 100644 index 0000000000000..29def44028e44 --- /dev/null +++ b/third_party/forked/etcd221/wal/walpb/BUILD @@ -0,0 +1,31 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", +) + +go_library( + name = "go_default_library", + srcs = [ + "record.go", + "record.pb.go", + ], + tags = ["automanaged"], + deps = ["//vendor:github.com/gogo/protobuf/proto"], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], +) diff --git a/third_party/forked/etcd221/wal/walpb/record.go b/third_party/forked/etcd221/wal/walpb/record.go new file mode 100644 index 0000000000000..bb5368569761c --- /dev/null +++ b/third_party/forked/etcd221/wal/walpb/record.go @@ -0,0 +1,29 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package walpb + +import "errors" + +var ( + ErrCRCMismatch = errors.New("walpb: crc mismatch") +) + +func (rec *Record) Validate(crc uint32) error { + if rec.Crc == crc { + return nil + } + rec.Reset() + return ErrCRCMismatch +} diff --git a/third_party/forked/etcd221/wal/walpb/record.pb.go b/third_party/forked/etcd221/wal/walpb/record.pb.go new file mode 100644 index 0000000000000..a9b38a47ef9be --- /dev/null +++ b/third_party/forked/etcd221/wal/walpb/record.pb.go @@ -0,0 +1,447 @@ +// Code generated by protoc-gen-gogo. +// source: record.proto +// DO NOT EDIT! + +/* + Package walpb is a generated protocol buffer package. + + It is generated from these files: + record.proto + + It has these top-level messages: + Record + Snapshot +*/ +package walpb + +import proto "github.com/gogo/protobuf/proto" +import math "math" + +// discarding unused import gogoproto "github.com/coreos/etcd/Godeps/_workspace/src/gogoproto" + +import io "io" +import fmt "fmt" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = math.Inf + +type Record struct { + Type int64 `protobuf:"varint,1,opt,name=type" json:"type"` + Crc uint32 `protobuf:"varint,2,opt,name=crc" json:"crc"` + Data []byte `protobuf:"bytes,3,opt,name=data" json:"data,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *Record) Reset() { *m = Record{} } +func (m *Record) String() string { return proto.CompactTextString(m) } +func (*Record) ProtoMessage() {} + +type Snapshot struct { + Index uint64 `protobuf:"varint,1,opt,name=index" json:"index"` + Term uint64 `protobuf:"varint,2,opt,name=term" json:"term"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *Snapshot) Reset() { *m = Snapshot{} } +func (m *Snapshot) String() string { return proto.CompactTextString(m) } +func (*Snapshot) ProtoMessage() {} + +func (m *Record) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *Record) MarshalTo(data []byte) (int, error) { + var i int + _ = i + var l int + _ = l + data[i] = 0x8 + i++ + i = encodeVarintRecord(data, i, uint64(m.Type)) + data[i] = 0x10 + i++ + i = encodeVarintRecord(data, i, uint64(m.Crc)) + if m.Data != nil { + data[i] = 0x1a + i++ + i = encodeVarintRecord(data, i, uint64(len(m.Data))) + i += copy(data[i:], m.Data) + } + if m.XXX_unrecognized != nil { + i += copy(data[i:], m.XXX_unrecognized) + } + return i, nil +} + +func (m *Snapshot) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *Snapshot) MarshalTo(data []byte) (int, error) { + var i int + _ = i + var l int + _ = l + data[i] = 0x8 + i++ + i = encodeVarintRecord(data, i, uint64(m.Index)) + data[i] = 0x10 + i++ + i = encodeVarintRecord(data, i, uint64(m.Term)) + if m.XXX_unrecognized != nil { + i += copy(data[i:], m.XXX_unrecognized) + } + return i, nil +} + +func encodeFixed64Record(data []byte, offset int, v uint64) int { + data[offset] = uint8(v) + data[offset+1] = uint8(v >> 8) + data[offset+2] = uint8(v >> 16) + data[offset+3] = uint8(v >> 24) + data[offset+4] = uint8(v >> 32) + data[offset+5] = uint8(v >> 40) + data[offset+6] = uint8(v >> 48) + data[offset+7] = uint8(v >> 56) + return offset + 8 +} +func encodeFixed32Record(data []byte, offset int, v uint32) int { + data[offset] = uint8(v) + data[offset+1] = uint8(v >> 8) + data[offset+2] = uint8(v >> 16) + data[offset+3] = uint8(v >> 24) + return offset + 4 +} +func encodeVarintRecord(data []byte, offset int, v uint64) int { + for v >= 1<<7 { + data[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + data[offset] = uint8(v) + return offset + 1 +} +func (m *Record) Size() (n int) { + var l int + _ = l + n += 1 + sovRecord(uint64(m.Type)) + n += 1 + sovRecord(uint64(m.Crc)) + if m.Data != nil { + l = len(m.Data) + n += 1 + l + sovRecord(uint64(l)) + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func (m *Snapshot) Size() (n int) { + var l int + _ = l + n += 1 + sovRecord(uint64(m.Index)) + n += 1 + sovRecord(uint64(m.Term)) + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func sovRecord(x uint64) (n int) { + for { + n++ + x >>= 7 + if x == 0 { + break + } + } + return n +} +func sozRecord(x uint64) (n int) { + return sovRecord(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *Record) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Type", wireType) + } + m.Type = 0 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.Type |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Crc", wireType) + } + m.Crc = 0 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.Crc |= (uint32(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Data", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthRecord + } + postIndex := iNdEx + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Data = append([]byte{}, data[iNdEx:postIndex]...) + iNdEx = postIndex + default: + var sizeOfWire int + for { + sizeOfWire++ + wire >>= 7 + if wire == 0 { + break + } + } + iNdEx -= sizeOfWire + skippy, err := skipRecord(data[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRecord + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, data[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + return nil +} +func (m *Snapshot) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Index", wireType) + } + m.Index = 0 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.Index |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Term", wireType) + } + m.Term = 0 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.Term |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + default: + var sizeOfWire int + for { + sizeOfWire++ + wire >>= 7 + if wire == 0 { + break + } + } + iNdEx -= sizeOfWire + skippy, err := skipRecord(data[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRecord + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, data[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + return nil +} +func skipRecord(data []byte) (n int, err error) { + l := len(data) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for { + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if data[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + iNdEx += length + if length < 0 { + return 0, ErrInvalidLengthRecord + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipRecord(data[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthRecord = fmt.Errorf("proto: negative length found during unmarshaling") +) diff --git a/third_party/forked/etcd221/wal/walpb/record.proto b/third_party/forked/etcd221/wal/walpb/record.proto new file mode 100644 index 0000000000000..b694cb2338aaf --- /dev/null +++ b/third_party/forked/etcd221/wal/walpb/record.proto @@ -0,0 +1,20 @@ +syntax = "proto2"; +package walpb; + +import "gogoproto/gogo.proto"; + +option (gogoproto.marshaler_all) = true; +option (gogoproto.sizer_all) = true; +option (gogoproto.unmarshaler_all) = true; +option (gogoproto.goproto_getters_all) = false; + +message Record { + optional int64 type = 1 [(gogoproto.nullable) = false]; + optional uint32 crc = 2 [(gogoproto.nullable) = false]; + optional bytes data = 3; +} + +message Snapshot { + optional uint64 index = 1 [(gogoproto.nullable) = false]; + optional uint64 term = 2 [(gogoproto.nullable) = false]; +}