Skip to content

Commit

Permalink
Provide way to convert from one sharding function to another.
Browse files Browse the repository at this point in the history
  • Loading branch information
kevina committed Jan 19, 2017
1 parent d2040b5 commit e7a3a27
Show file tree
Hide file tree
Showing 4 changed files with 360 additions and 1 deletion.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ install: true
script:
- make deps
- gx-go rewrite
- go test -race -coverprofile=unittest.coverprofile -covermode=atomic ./...
- go test -race -coverprofile=unittest.coverprofile -covermode=atomic .


after_success:
Expand Down
173 changes: 173 additions & 0 deletions convert.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
// Package flatfs is a Datastore implementation that stores all
// objects in a two-level directory structure in the local file
// system, regardless of the hierarchy of the keys.
package flatfs

import (
"fmt"
"io"
"os"
"path/filepath"
"time"

"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
"github.com/jbenet/go-os-rename"
)

func UpgradeV0toV1(path string, prefixLen int) error {
fun := Prefix(prefixLen)
err := WriteShardFunc(path, fun)
if err != nil {
return err
}
err = WriteReadme(path, fun)
if err != nil {
return err
}
return nil
}

func DowngradeV1toV0(path string) error {
err := os.Remove(filepath.Join(path, SHARDING_FN))
if err != nil {
return err
}
err = os.Remove(filepath.Join(path, README_FN))
if err != nil && !os.IsNotExist(err) {
return err
}
return nil
}

func Move(oldPath string, newPath string, out io.Writer) error {
oldDS, err := Open(oldPath, false)
if err != nil {
return fmt.Errorf("%s: %v", oldPath, err)
}
newDS, err := Open(newPath, false)
if err != nil {
return fmt.Errorf("%s: %v", newPath, err)
}

if out != nil {
fmt.Fprintf(out, "Getting Keys...\n")
}
res, err := oldDS.Query(query.Query{KeysOnly: true})
if err != nil {
return err
}
entries, err := res.Rest()
if err != nil {
return err
}
prog := newProgress(len(entries), out)

if out != nil {
fmt.Fprintf(out, "Moving Keys...\n")
}

// first move the keys
for _, e := range entries {
err := moveKey(oldDS, newDS, datastore.RawKey(e.Key))
if err != nil {
return err
}
prog.Next()
}

if out != nil {
fmt.Fprintf(out, "\nCleaning Up...\n")
}

// now walk the old top-level directory
dir, err := os.Open(oldDS.path)
if err != nil {
return err
}
defer dir.Close()
names, err := dir.Readdirnames(-1)
if err != nil {
return err
}
for _, fn := range names {
if fn == "." || fn == ".." {
continue
}
oldPath := filepath.Join(oldDS.path, fn)
inf, err := os.Stat(oldPath)
if err != nil {
return err
}
if inf.IsDir() || fn == "SHARDING" || fn == "_README" {
// if we are a director or generated file just remove it
err := os.Remove(oldPath)
if err != nil {
return err
}
} else {
// else move it
newPath := filepath.Join(newDS.path, fn)
err := osrename.Rename(oldPath, newPath)
if err != nil {
return err
}
}
}

if out != nil {
fmt.Fprintf(out, "All Done.\n")
}

return nil
}

func moveKey(oldDS *Datastore, newDS *Datastore, key datastore.Key) error {
_, oldPath := oldDS.encode(key)
dir, newPath := newDS.encode(key)
err := newDS.makeDirNoSync(dir)
if err != nil {
return err
}
err = osrename.Rename(oldPath, newPath)
if err != nil {
return err
}
return nil
}

type progress struct {
total int
current int

out io.Writer

start time.Time
}

func newProgress(total int, out io.Writer) *progress {
return &progress{
total: total,
start: time.Now(),
out: out,
}
}

func (p *progress) Next() {
p.current++
if p.out == nil {
return
}
if p.current%10 == 0 || p.current == p.total {
fmt.Fprintf(p.out, "\r[%d / %d]", p.current, p.total)
}

if p.current%100 == 0 || p.current == p.total {
took := time.Now().Sub(p.start)
av := took / time.Duration(p.current)
estim := av * time.Duration(p.total-p.current)
//est := strings.Split(estim.String(), ".")[0]

fmt.Fprintf(p.out, " Approx time remaining: %s ", estim)
}
}
90 changes: 90 additions & 0 deletions convert_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package flatfs_test

import (
"bytes"
"encoding/hex"
"io/ioutil"
"os"
"path/filepath"
"testing"

"github.com/ipfs/go-datastore"
"github.com/ipfs/go-ds-flatfs"
//"gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore/query"

rand "github.com/dustin/randbo"
)

func TestMove(t *testing.T) {
tempdir, cleanup := tempdir(t)
defer cleanup()

v1dir := filepath.Join(tempdir, "v1")
err := flatfs.Create(v1dir, flatfs.Prefix(3))
if err != nil {
t.Fatalf("Create fail: %v\n", err)
}
err = ioutil.WriteFile(filepath.Join(v1dir, "README_ALSO"), []byte("something"), 0666)
if err != nil {
t.Fatalf("WriteFile fail: %v\n", err)
}
v1, err := flatfs.Open(v1dir, false)
if err != nil {
t.Fatalf("New fail: %v\n", err)
}

r := rand.New()
var blocks [][]byte
var keys []datastore.Key
for i := 0; i < 256; i++ {
blk := make([]byte, 1000)
r.Read(blk)
blocks = append(blocks, blk)

key := "x" + hex.EncodeToString(blk[:8])
keys = append(keys, datastore.NewKey(key))
err := v1.Put(keys[i], blocks[i])
if err != nil {
t.Fatalf("Put fail: %v\n", err)
}
}

v2dir := filepath.Join(tempdir, "v2")
err = flatfs.Create(v2dir, flatfs.NextToLast(2))
if err != nil {
t.Fatalf("Mkdir fail: %v\n", err)
}
flatfs.Move(v1dir, v2dir, nil)

// This should fail if the directory is not empty
err = os.Remove(v1dir)
if err != nil {
t.Fatalf("Remove fail: %v\n", err)
}

// Make sure the README file moved
_, err = os.Stat(filepath.Join(v2dir, "README_ALSO"))
if err != nil {
t.Fatalf(err.Error())
}

v2, err := flatfs.Open(v2dir, false)
if err != nil {
t.Fatalf("New fail: %v\n", err)
}

// Sanity check, make sure we can retrieve a new key
data, err := v2.Get(keys[0])
if err != nil {
t.Fatalf("Get fail: %v\n", err)
}
if !bytes.Equal(data.([]byte), blocks[0]) {
t.Fatalf("block context differ for key %s\n", keys[0].String())
}

shard := filepath.Join(v2dir, flatfs.NextToLast(2).Func()(keys[0].String()))
_, err = os.Stat(shard)
if err != nil {
t.Fatalf(err.Error())
}
}
96 changes: 96 additions & 0 deletions flatfs/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package main

import (
"fmt"
"os"
"strconv"

"github.com/ipfs/go-ds-flatfs"
)

// To convert from the old format to a new format with a different
// sharding function use:
// flatfs upgrade blocks 5
// flatfs create blocks-new v1/next-to-last/2
// flatfs move blocks blocks-new
// rmdir blocks
// mv blocks-new blocks
// to do the reverse
// flatfs create blocks-new v1/prefix/5
// flatfs move blocks blocks-new
// rmdir blocks
// mv blocks-new blocks
// flatfs downgrade blocks

func usage() {
fmt.Fprintf(os.Stderr, "usage: %s create DIR SHARDFUN | upgrade DIR PREFIXLEN | downgrade DIR | move OLDDIR NEWDIR\n", os.Args[0])
os.Exit(1)
}

func fail(err error) {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}

func main() {
if len(os.Args) < 2 {
usage()
}

switch os.Args[1] {
case "create":
if len(os.Args) != 4 {
usage()
}
dir := os.Args[2]
funStr := os.Args[3]
if funStr[0] != '/' {
if funStr[0] != 'v' { // and version if not provided
funStr = "v1/" + funStr
}
funStr = flatfs.PREFIX + funStr
}
fun, err := flatfs.ParseShardFunc(funStr)
if err != nil {
fail(err)
}
err = flatfs.Create(dir, fun)
if err != nil {
fail(err)
}
case "upgrade":
if len(os.Args) != 4 {
usage()
}
dir := os.Args[2]
prefixLen, err := strconv.Atoi(os.Args[3])
if err != nil {
fail(err)
}
err = flatfs.UpgradeV0toV1(dir, prefixLen)
if err != nil {
fail(err)
}
case "downgrade":
if len(os.Args) != 3 {
usage()
}
dir := os.Args[2]
err := flatfs.DowngradeV1toV0(dir)
if err != nil {
fail(err)
}
case "move":
if len(os.Args) != 4 {
usage()
}
oldDir := os.Args[2]
newDir := os.Args[3]
err := flatfs.Move(oldDir, newDir, os.Stderr)
if err != nil {
fail(err)
}
default:
usage()
}
}

0 comments on commit e7a3a27

Please sign in to comment.