diff --git a/datatype/keyvalue/keyvalue.go b/datatype/keyvalue/keyvalue.go index c3c89006..9605c36d 100644 --- a/datatype/keyvalue/keyvalue.go +++ b/datatype/keyvalue/keyvalue.go @@ -71,11 +71,12 @@ $ dvid -stdin node put < data Puts stdin data into the keyvalue data instance under the given key. -$ dvid node dump-cloud +$ dvid node dump-cloud [optional # of workers] Dumps all key-value pairs in the keyvalue data instance to the cloud storage bucket. + For authorization on cloud storage, the user must have set up the appropriate + credentials in environment variables. - ------------------ HTTP API (Level 2 REST): diff --git a/datatype/keyvalue/rpc.go b/datatype/keyvalue/rpc.go index 89e922a3..a4045209 100644 --- a/datatype/keyvalue/rpc.go +++ b/datatype/keyvalue/rpc.go @@ -1,10 +1,14 @@ package keyvalue import ( + "context" "fmt" "github.com/janelia-flyem/dvid/datastore" + "github.com/janelia-flyem/dvid/dvid" + "github.com/janelia-flyem/dvid/storage" + "gocloud.dev/blob" _ "gocloud.dev/blob/gcsblob" _ "gocloud.dev/blob/s3blob" ) @@ -46,10 +50,17 @@ func (d *Data) put(cmd datastore.Request, reply *datastore.Response) error { // dumpCloud stores all key-value pairs in cloud storage. func (d *Data) dumpCloud(cmd datastore.Request, reply *datastore.Response) error { if len(cmd.Command) < 5 { - return fmt.Errorf("dump-cloud must be followed by ") + return fmt.Errorf("dump-cloud must be followed by ") + } + var uuidStr, dataName, cmdStr, refStr, workersStr string + cmd.CommandArgs(1, &uuidStr, &dataName, &cmdStr, &refStr, &workersStr) + + var workers int = 20 + if workersStr != "" { + if _, err := fmt.Sscanf(workersStr, "%d", &workers); err != nil { + return fmt.Errorf("bad number of workers (%q): %v", workersStr, err) + } } - var uuidStr, dataName, cmdStr, refStr string - cmd.CommandArgs(1, &uuidStr, &dataName, &cmdStr, &refStr) _, versionID, err := datastore.MatchingUUID(uuidStr) if err != nil { @@ -63,15 +74,64 @@ func (d *Data) dumpCloud(cmd datastore.Request, reply *datastore.Response) error return err } } - // ctx := datastore.NewVersionedCtx(d, versionID) - _ = datastore.NewVersionedCtx(d, versionID) - // TODO: Dump to cloud store + bucket, err := storage.OpenBucket(refStr) + if err != nil { + return err + } + defer bucket.Close() + + // Start goroutines to write key-value pairs to cloud storage. + ch := make(chan *storage.TKeyValue, 1000) + for i := 0; i < workers; i++ { + go d.writeWorker(bucket, ch) + } + + // Send all key-values to the writer goroutines. + d.sendKVsToWriters(versionID, ch) reply.Output = []byte(fmt.Sprintf("Dumped all key-values to cloud %q for keyvalue %q, uuid %s\n", refStr, d.DataName(), uuidStr)) return nil } -// TODO: Create transfer function using channels to get decent parallel write to cloud, assume -// getting from disk will be much faster than writing to cloud. +// writeWorker writes key-value pairs to cloud storage. +func (d *Data) writeWorker(bucket *blob.Bucket, ch chan *storage.TKeyValue) { + for kv := range ch { + key, err := DecodeTKey(kv.K) + if err != nil { + dvid.Errorf("Error decoding TKey: %v\n", err) + continue + } + w, err := bucket.NewWriter(context.Background(), key, nil) + if err != nil { + dvid.Errorf("Error creating writer for key %q: %v\n", key, err) + continue + } + if _, err := w.Write(kv.V); err != nil { + dvid.Errorf("Error writing key %q to cloud bucket: %v\n", key, err) + continue + } + if err := w.Close(); err != nil { + dvid.Errorf("Error closing after key %q to cloud bucket: %v\n", key, err) + } + } +} + +// iterate through all key-value pairs and send them to the writer channel. +func (d *Data) sendKVsToWriters(versionID dvid.VersionID, ch chan *storage.TKeyValue) error { + ctx := datastore.NewVersionedCtx(d, versionID) + db, err := datastore.GetOrderedKeyValueDB(d) + if err != nil { + return err + } + first := storage.MinTKey(keyStandard) + last := storage.MaxTKey(keyStandard) + return db.ProcessRange(ctx, first, last, &storage.ChunkOp{}, func(c *storage.Chunk) error { + if c == nil || c.TKeyValue == nil || c.TKeyValue.V == nil { + return nil + } + ch <- c.TKeyValue + return nil + }) +}