Skip to content

Commit

Permalink
add logic for dumping kv to cloud for keyvalue
Browse files Browse the repository at this point in the history
  • Loading branch information
DocSavage committed Aug 4, 2024
1 parent 6c57b38 commit eb1f60f
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 10 deletions.
5 changes: 3 additions & 2 deletions datatype/keyvalue/keyvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,12 @@ $ dvid -stdin node <UUID> <data name> put <key> < data
Puts stdin data into the keyvalue data instance under the given key.
$ dvid node <UUID> <data name> dump-cloud <bucket>
$ dvid node <UUID> <data name> dump-cloud <bucket> [optional # of workers]
Dumps all key-value pairs in the keyvalue data instance to the cloud storage bucket.
For authorization on cloud storage, the user must have set up the appropriate
credentials in environment variables.
------------------
HTTP API (Level 2 REST):
Expand Down
76 changes: 68 additions & 8 deletions datatype/keyvalue/rpc.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package keyvalue

import (
"context"
"fmt"

"github.com/janelia-flyem/dvid/datastore"
"github.com/janelia-flyem/dvid/dvid"
"github.com/janelia-flyem/dvid/storage"

"gocloud.dev/blob"
_ "gocloud.dev/blob/gcsblob"
_ "gocloud.dev/blob/s3blob"
)
Expand Down Expand Up @@ -46,10 +50,17 @@ func (d *Data) put(cmd datastore.Request, reply *datastore.Response) error {
// dumpCloud stores all key-value pairs in cloud storage.
func (d *Data) dumpCloud(cmd datastore.Request, reply *datastore.Response) error {
if len(cmd.Command) < 5 {
return fmt.Errorf("dump-cloud must be followed by <cloud path>")
return fmt.Errorf("dump-cloud must be followed by <cloud path> <optional # workers>")
}
var uuidStr, dataName, cmdStr, refStr, workersStr string
cmd.CommandArgs(1, &uuidStr, &dataName, &cmdStr, &refStr, &workersStr)

var workers int = 20
if workersStr != "" {
if _, err := fmt.Sscanf(workersStr, "%d", &workers); err != nil {
return fmt.Errorf("bad number of workers (%q): %v", workersStr, err)
}
}
var uuidStr, dataName, cmdStr, refStr string
cmd.CommandArgs(1, &uuidStr, &dataName, &cmdStr, &refStr)

_, versionID, err := datastore.MatchingUUID(uuidStr)
if err != nil {
Expand All @@ -63,15 +74,64 @@ func (d *Data) dumpCloud(cmd datastore.Request, reply *datastore.Response) error
return err
}
}
// ctx := datastore.NewVersionedCtx(d, versionID)
_ = datastore.NewVersionedCtx(d, versionID)

// TODO: Dump to cloud store
bucket, err := storage.OpenBucket(refStr)
if err != nil {
return err
}
defer bucket.Close()

// Start goroutines to write key-value pairs to cloud storage.
ch := make(chan *storage.TKeyValue, 1000)
for i := 0; i < workers; i++ {
go d.writeWorker(bucket, ch)
}

// Send all key-values to the writer goroutines.
d.sendKVsToWriters(versionID, ch)

reply.Output = []byte(fmt.Sprintf("Dumped all key-values to cloud %q for keyvalue %q, uuid %s\n",
refStr, d.DataName(), uuidStr))
return nil
}

// TODO: Create transfer function using channels to get decent parallel write to cloud, assume
// getting from disk will be much faster than writing to cloud.
// writeWorker writes key-value pairs to cloud storage.
func (d *Data) writeWorker(bucket *blob.Bucket, ch chan *storage.TKeyValue) {
for kv := range ch {
key, err := DecodeTKey(kv.K)
if err != nil {
dvid.Errorf("Error decoding TKey: %v\n", err)
continue
}
w, err := bucket.NewWriter(context.Background(), key, nil)
if err != nil {
dvid.Errorf("Error creating writer for key %q: %v\n", key, err)
continue
}
if _, err := w.Write(kv.V); err != nil {
dvid.Errorf("Error writing key %q to cloud bucket: %v\n", key, err)
continue
}
if err := w.Close(); err != nil {
dvid.Errorf("Error closing after key %q to cloud bucket: %v\n", key, err)
}
}
}

// iterate through all key-value pairs and send them to the writer channel.
func (d *Data) sendKVsToWriters(versionID dvid.VersionID, ch chan *storage.TKeyValue) error {
ctx := datastore.NewVersionedCtx(d, versionID)
db, err := datastore.GetOrderedKeyValueDB(d)
if err != nil {
return err
}
first := storage.MinTKey(keyStandard)
last := storage.MaxTKey(keyStandard)
return db.ProcessRange(ctx, first, last, &storage.ChunkOp{}, func(c *storage.Chunk) error {
if c == nil || c.TKeyValue == nil || c.TKeyValue.V == nil {
return nil
}
ch <- c.TKeyValue
return nil
})
}

0 comments on commit eb1f60f

Please sign in to comment.