Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add log abstraction to store records in disk #15

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 71 additions & 24 deletions db/backend.go
Original file line number Diff line number Diff line change
@@ -1,45 +1,61 @@
package db

import (
"fmt"
"io/ioutil"
"os"
"strings"

"github.com/go-distributed/xtree/db/recordio"
dblog "github.com/go-distributed/xtree/db/log"
"github.com/go-distributed/xtree/db/message"
"github.com/go-distributed/xtree/third-party/github.com/google/btree"
)

type backend struct {
bt *btree.BTree
cache *cache
rev int
fc recordio.Fetcher
ap recordio.Appender
bt *btree.BTree
cache *cache
rev int
dblog *dblog.DBLog
config *DBConfig
}

func newBackend() *backend {
bt := btree.New(10)

// temporary file IO to test in-disk values
writeFile, err := ioutil.TempFile("", "backend")
dataDir, err := ioutil.TempDir("", "backend")
if err != nil {
panic("can't create temp file")
panic("not implemented")
}

config := &DBConfig{
DataDir: dataDir,
}
readFile, err := os.Open(writeFile.Name())
b, err := newBackendWithConfig(config)
if err != nil {
panic("can't open temp file")
panic("not implemented")
}
return b
}

return &backend{
bt: bt,
cache: newCache(),
fc: recordio.NewFetcher(readFile),
ap: recordio.NewAppender(writeFile),
func newBackendWithConfig(config *DBConfig) (b *backend, err error) {
bt := btree.New(10)
b = &backend{
bt: bt,
cache: newCache(),
config: config,
}
haveLog := dblog.Exist(config.DataDir)
switch haveLog {
case false:
fmt.Println("didn't have log file. Init...")
err = b.init(config)
case true:
fmt.Println("had log file. Restore...")
err = b.restore(config)
}
return
}

func (b *backend) getData(offset int64) []byte {
rec, err := b.fc.Fetch(offset)
rec, err := b.dblog.GetRecord(offset)
if err != nil {
panic("unimplemented")
}
Expand Down Expand Up @@ -87,7 +103,10 @@ func (b *backend) Put(rev int, path Path, data []byte) {
}

b.rev++
offset, err := b.ap.Append(recordio.Record{data})
offset, err := b.dblog.Append(&message.Record{
Key: path.p,
Data: data,
})
if err != nil {
panic("unimplemented")
}
Expand All @@ -96,8 +115,8 @@ func (b *backend) Put(rev int, path Path, data []byte) {
}

// one-level listing
func (b *backend) Ls(pathname string) []Path {
result := make([]Path, 0)
func (b *backend) Ls(pathname string) (paths []Path) {
paths = make([]Path, 0)
pivot := newPathForLs(pathname)

b.bt.AscendGreaterOrEqual(pivot, func(treeItem btree.Item) bool {
Expand All @@ -106,8 +125,36 @@ func (b *backend) Ls(pathname string) []Path {
p.level != pivot.level {
return false
}
result = append(result, *p)
paths = append(paths, *p)
return true
})
return result

return
}

// init() creates a new log file
func (b *backend) init(config *DBConfig) (err error) {
b.dblog, err = dblog.Create(config.DataDir)
return
}

// restore() restores database from the log file.
func (b *backend) restore(config *DBConfig) (err error) {
rev := 0
return dblog.Reuse(config.DataDir,
func(l *dblog.DBLog) {
b.dblog = l
},
func(r *message.Record) (err error) {
rev++
p := newPath(r.Key)
b.Put(rev, *p, r.Data)
return
})
}

// clean up resource after testing
func (b *backend) testableCleanupResource() (err error) {
b.dblog.Close()
return os.RemoveAll(b.config.DataDir)
}
58 changes: 52 additions & 6 deletions db/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ func TestPut(t *testing.T) {
}

b := newBackend()
defer b.testableCleanupResource()
for i, tt := range tests {
b.Put(tt.rev, tt.path, tt.data)
v := b.Get(tt.rev, tt.path)
Expand All @@ -27,6 +28,7 @@ func TestPut(t *testing.T) {
t.Errorf("#%d: data = %s, want %s", i, v.data, tt.data)
}
}

}

func TestPutOnExistingPath(t *testing.T) {
Expand All @@ -40,6 +42,7 @@ func TestPutOnExistingPath(t *testing.T) {
}

b := newBackend()
defer b.testableCleanupResource()
for i, tt := range tests {
b.Put(2*i+1, tt.path, tt.data1)
v := b.Get(2*i+1, tt.path)
Expand All @@ -65,6 +68,8 @@ func TestPutOnExistingPath(t *testing.T) {

func TestGetMVCC(t *testing.T) {
b := newBackend()
defer b.testableCleanupResource()

b.Put(1, *newPath("/a"), []byte("1"))
b.Put(2, *newPath("/b"), []byte("2"))
b.Put(3, *newPath("/a"), []byte("3"))
Expand Down Expand Up @@ -99,12 +104,15 @@ func TestGetMVCC(t *testing.T) {
}

func TestLs(t *testing.T) {
back := newBackend()
d := []byte("somedata")
back.Put(1, *newPath("/a"), d)
back.Put(2, *newPath("/a/b"), d)
back.Put(3, *newPath("/a/c"), d)
back.Put(4, *newPath("/b"), d)

b := newBackend()
defer b.testableCleanupResource()

b.Put(1, *newPath("/a"), d)
b.Put(2, *newPath("/a/b"), d)
b.Put(3, *newPath("/a/c"), d)
b.Put(4, *newPath("/b"), d)

tests := []struct {
p string
Expand All @@ -118,7 +126,7 @@ func TestLs(t *testing.T) {
{"/c", []string{}},
}
for i, tt := range tests {
ps := back.Ls(tt.p)
ps := b.Ls(tt.p)
if len(ps) != len(tt.wps) {
t.Fatalf("#%d: len(ps) = %d, want %d", i, len(ps), len(tt.wps))
}
Expand All @@ -130,9 +138,44 @@ func TestLs(t *testing.T) {
}
}

func TestRestore(t *testing.T) {
tests := []struct {
rev int
path Path
data []byte
}{
{1, *newPath("/foo/bar"), []byte("somedata")},
{2, *newPath("/bar/foo"), []byte("datasome")},
}

b := newBackend()
for _, tt := range tests {
// append records to the log
b.Put(tt.rev, tt.path, tt.data)
}
b.dblog.Close()

// simulate restoring log in another backend
b2, err := newBackendWithConfig(b.config)
defer b2.testableCleanupResource()
if err != nil {
t.Errorf("newBackendWithConfig failed: %v", err)
}
for i, tt := range tests {
v := b2.Get(tt.rev, tt.path)
if v.rev != tt.rev {
t.Errorf("#%d: rev = %d, want %d", i, v.rev, tt.rev)
}
if !reflect.DeepEqual(v.data, tt.data) {
t.Errorf("#%d: data = %s, want %s", i, v.data, tt.data)
}
}
}

func BenchmarkPut(b *testing.B) {
b.StopTimer()
back := newBackend()
defer back.testableCleanupResource()
d := []byte("somedata")
path := make([]Path, b.N)
for i := range path {
Expand All @@ -148,6 +191,8 @@ func BenchmarkPut(b *testing.B) {
func BenchmarkGetWithCache(b *testing.B) {
b.StopTimer()
back := newBackend()
defer back.testableCleanupResource()

d := []byte("somedata")
path := make([]Path, b.N)
for i := range path {
Expand All @@ -168,6 +213,7 @@ func BenchmarkGetWithCache(b *testing.B) {
func BenchmarkGetWithOutCache(b *testing.B) {
b.StopTimer()
back := newBackend()
defer back.testableCleanupResource()
back.cache = nil
d := []byte("somedata")
path := make([]Path, b.N)
Expand Down
5 changes: 5 additions & 0 deletions db/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package db

type DBConfig struct {
DataDir string
}
2 changes: 1 addition & 1 deletion db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type DB interface {
// Otherwise, it lists recursively all paths.
//
// if count is >= 0, it is the number of paths we want in the list.
// if count is -1, it means any.
// if count is -1, it means all.
//
// if it failed, an error is returned.
Ls(rev int, path string, recursive bool, count int) ([]Path, error)
Expand Down
34 changes: 34 additions & 0 deletions db/log/decoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package log

import (
"bufio"
"encoding/binary"
"io"

"github.com/go-distributed/xtree/db/message"
)

type decoder struct {
br *bufio.Reader
}

func newDecoder(r io.Reader) *decoder {
return &decoder{bufio.NewReader(r)}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, no needs to double buffer reads.

}

func (d *decoder) decode(r *message.Record) (err error) {
var l int64
if l, err = readInt64(d.br); err != nil {
return
}
data := make([]byte, l)
if _, err = io.ReadFull(d.br, data); err != nil {
return
}
return r.Unmarshal(data)
}

func readInt64(r io.Reader) (n int64, err error) {
err = binary.Read(r, binary.LittleEndian, &n)
return
}
37 changes: 37 additions & 0 deletions db/log/encoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package log

import (
"bufio"
"encoding/binary"
"io"

"github.com/go-distributed/xtree/db/message"
)

type encoder struct {
bw *bufio.Writer
}

func newEncoder(w io.Writer) *encoder {
return &encoder{bufio.NewWriter(w)}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No needs to have bufio here since OS already buffers IO for you. Double buffer is evil in database.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's interesting. We need to investigate more regarding buffering behavior.

It currently uses bufio to write a single request: (length | record).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Buffer should only be done in kernel space.

In kernel space, OS's page cache will buffer any incoming writes until you fsync on the file or until the flusher thread kicks in. There is no point buffering them in user space unless you want to speed up the reads for buffered data.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't about buffering. It's just a way to construct a message to be written into disk.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can still construct the message with io.writer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we don't buffer stuff until we know there is a performance issue without buffering? : ) The code will be much simpler.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope. You can keep a downstream branch without buffer. I don't understand this part in OS so I will keep it in upstream.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not upstream, this is just a pull request towards upstream. I'm just pointing out things that doesn't make sense in the pull request.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a follow up on this. According to KAFKA doc, "As a result of these factors using the filesystem and relying on pagecache is superior to maintaining an in-memory cache or other structure". It sounds OS page-cache reliance is a better option. And thanks for the discussion here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for doing the due diligence before adding a feature :)

To clarify, most of stuff we were discussing here is buffering instead of caching. Caching is a super set of buffering in the sense that you can read from a cache but cannot read from a bufio writer.

}

func (e *encoder) encode(r *message.Record) (err error) {
var data []byte
if data, err = r.Marshal(); err != nil {
return
}
if err = writeInt64(e.bw, int64(len(data))); err != nil {
return
}
_, err = e.bw.Write(data)
return
}

func (e *encoder) flush() error {
return e.bw.Flush()
}

func writeInt64(w io.Writer, n int64) error {
return binary.Write(w, binary.LittleEndian, n)
}
Loading