Skip to content

Commit

Permalink
add setup for dumpCloud rpc in keyvalue
Browse files Browse the repository at this point in the history
  • Loading branch information
DocSavage committed Aug 3, 2024
1 parent 6610219 commit 6c57b38
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 105 deletions.
40 changes: 6 additions & 34 deletions datatype/keyvalue/keyvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ $ dvid -stdin node <UUID> <data name> put <key> < data
Puts stdin data into the keyvalue data instance under the given key.
$ dvid node <UUID> <data name> dump-cloud <bucket>
Dumps all key-value pairs in the keyvalue data instance to the cloud storage bucket.
------------------
Expand Down Expand Up @@ -575,40 +579,6 @@ func (d *Data) DeleteData(ctx storage.Context, keyStr string) error {
return db.Delete(ctx, tk)
}

// put handles a PUT command-line request.
func (d *Data) put(cmd datastore.Request, reply *datastore.Response) error {
if len(cmd.Command) < 5 {
return fmt.Errorf("the key name must be specified after 'put'")
}
if len(cmd.Input) == 0 {
return fmt.Errorf("no data was passed into standard input")
}
var uuidStr, dataName, cmdStr, keyStr string
cmd.CommandArgs(1, &uuidStr, &dataName, &cmdStr, &keyStr)

_, versionID, err := datastore.MatchingUUID(uuidStr)
if err != nil {
return err
}

// Store data
if !d.Versioned() {
// Map everything to root version.
versionID, err = datastore.GetRepoRootVersion(versionID)
if err != nil {
return err
}
}
ctx := datastore.NewVersionedCtx(d, versionID)
if err = d.PutData(ctx, keyStr, cmd.Input); err != nil {
return fmt.Errorf("error on put to key %q for keyvalue %q: %v", keyStr, d.DataName(), err)
}

reply.Output = []byte(fmt.Sprintf("Put %d bytes into key %q for keyvalue %q, uuid %s\n",
len(cmd.Input), keyStr, d.DataName(), uuidStr))
return nil
}

// JSONString returns the JSON for this Data's configuration
func (d *Data) JSONString() (jsonStr string, err error) {
m, err := json.Marshal(d)
Expand All @@ -629,6 +599,8 @@ func (d *Data) DoRPC(request datastore.Request, reply *datastore.Response) error
switch request.TypeCommand() {
case "put":
return d.put(request, reply)
case "dump-cloud":
return d.dumpCloud(request, reply)
default:
return fmt.Errorf("unknown command. Data '%s' [%s] does not support '%s' command",
d.DataName(), d.TypeName(), request.TypeCommand())
Expand Down
77 changes: 77 additions & 0 deletions datatype/keyvalue/rpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package keyvalue

import (
"fmt"

"github.com/janelia-flyem/dvid/datastore"

_ "gocloud.dev/blob/gcsblob"
_ "gocloud.dev/blob/s3blob"
)

// put handles a PUT command-line request.
func (d *Data) put(cmd datastore.Request, reply *datastore.Response) error {
if len(cmd.Command) < 5 {
return fmt.Errorf("the key name must be specified after 'put'")
}
if len(cmd.Input) == 0 {
return fmt.Errorf("no data was passed into standard input")
}
var uuidStr, dataName, cmdStr, keyStr string
cmd.CommandArgs(1, &uuidStr, &dataName, &cmdStr, &keyStr)

_, versionID, err := datastore.MatchingUUID(uuidStr)
if err != nil {
return err
}

// Store data
if !d.Versioned() {
// Map everything to root version.
versionID, err = datastore.GetRepoRootVersion(versionID)
if err != nil {
return err
}
}
ctx := datastore.NewVersionedCtx(d, versionID)
if err = d.PutData(ctx, keyStr, cmd.Input); err != nil {
return fmt.Errorf("error on put to key %q for keyvalue %q: %v", keyStr, d.DataName(), err)
}

reply.Output = []byte(fmt.Sprintf("Put %d bytes into key %q for keyvalue %q, uuid %s\n",
len(cmd.Input), keyStr, d.DataName(), uuidStr))
return nil
}

// dumpCloud stores all key-value pairs in cloud storage.
func (d *Data) dumpCloud(cmd datastore.Request, reply *datastore.Response) error {
if len(cmd.Command) < 5 {
return fmt.Errorf("dump-cloud must be followed by <cloud path>")
}
var uuidStr, dataName, cmdStr, refStr string
cmd.CommandArgs(1, &uuidStr, &dataName, &cmdStr, &refStr)

_, versionID, err := datastore.MatchingUUID(uuidStr)
if err != nil {
return err
}

if !d.Versioned() {
// Map everything to root version.
versionID, err = datastore.GetRepoRootVersion(versionID)
if err != nil {
return err
}
}
// ctx := datastore.NewVersionedCtx(d, versionID)
_ = datastore.NewVersionedCtx(d, versionID)

// TODO: Dump to cloud store

reply.Output = []byte(fmt.Sprintf("Dumped all key-values to cloud %q for keyvalue %q, uuid %s\n",
refStr, d.DataName(), uuidStr))
return nil
}

// TODO: Create transfer function using channels to get decent parallel write to cloud, assume
// getting from disk will be much faster than writing to cloud.
90 changes: 90 additions & 0 deletions storage/cloud.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package storage

import (
"context"
"fmt"
"strings"

"github.com/janelia-flyem/dvid/dvid"

"gocloud.dev/blob"
"gocloud.dev/blob/gcsblob"
"gocloud.dev/gcp"
)

// OpenBucket returns a blob.Bucket for the given reference.
// The reference should be of the form:
//
// gcs://<bucketname>
// s3://<bucketname>
// vast://<endpoint>/<bucketname>
func OpenBucket(ref string) (bucket *blob.Bucket, err error) {
ctx := context.Background()

if strings.HasPrefix(ref, "s3://") {
// S3 handler contributed by Flatiron (pgunn)
// This relies on the non-GCS-specific blob API and requires that the user:
// A: Have set up AWS credentials in ways gocloud can find them (see the "aws config" command)
// B: Have set the AWS_REGION environment variable (usually to us-east-2)
bucket, err = blob.OpenBucket(ctx, ref)
if err != nil {
dvid.Errorf("Can't open bucket reference @ %q: %v\n", ref, err)
return nil, err
}
pathpart := strings.TrimPrefix(ref, "s3://")
pathpart = strings.SplitN(pathpart, "/", 2)[1] // Remove the bucket name
bucket = blob.PrefixedBucket(bucket, pathpart)

} else if strings.HasPrefix(ref, "vast://") {
// VAST S3-compatible storage
// The ref should be of form "vast://<endpoint>/<bucket>" where
// <endpoint> is the VAST endpoint and <bucket> is the bucket name.
// The following endpoints must be set:
// AWS_REGION: ignored but must be set for this cross-cloud library
// AWS_SHARED_CREDENTIALS_FILE: path to a file with AWS credentials that
// should be in form:
// [default]
// aws_access_key_id = <access key>
// aws_secret_access_key = <secret key>
ref := strings.TrimPrefix(ref, "vast://")
ref_parts := strings.SplitN(ref, "/", 2)
if len(ref_parts) != 2 {
return nil, fmt.Errorf("vast ref must be of form 'vast://<endpoint>/<bucket>'")
}
endpoint := ref_parts[0]
bucketname := ref_parts[1]
url := fmt.Sprintf("s3://%s?endpoint=%s&s3ForcePathStyle=true", bucketname, endpoint)
bucket, err = blob.OpenBucket(ctx, url)
if err != nil {
dvid.Errorf("Can't open bucket reference @ %q: %v\n", ref, err)
return nil, err
}

} else {
// In this case default to Google Store authentication as DVID did before
// See https://cloud.google.com/docs/authentication/production
// for more info on alternatives.
creds, err := gcp.DefaultCredentials(ctx)
if err != nil {
return nil, err
}

// Create an HTTP client.
// This example uses the default HTTP transport and the credentials
// created above.
client, err := gcp.NewHTTPClient(
gcp.DefaultTransport(),
gcp.CredentialsTokenSource(creds))
if err != nil {
return nil, err
}

// Create a *blob.Bucket.
bucket, err = gcsblob.OpenBucket(ctx, client, ref, nil)
if err != nil {
fmt.Printf("Can't open bucket reference @ %q: %v\n", ref, err)
return nil, err
}
}
return bucket, nil
}
75 changes: 4 additions & 71 deletions storage/ngprecomputed/ngprecomputed.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,7 @@ import (

"github.com/blang/semver"
"gocloud.dev/blob"
"gocloud.dev/blob/gcsblob"
_ "gocloud.dev/blob/gcsblob"
_ "gocloud.dev/blob/s3blob"
"gocloud.dev/gcerrors"
"gocloud.dev/gcp"

"github.com/janelia-flyem/dvid/dvid"
"github.com/janelia-flyem/dvid/storage"
Expand Down Expand Up @@ -145,75 +141,12 @@ func (e Engine) newStore(config dvid.StoreConfig) (*ngStore, bool, error) {
// }

dvid.Infof("Trying to open NG-Precomputed store @ %q ...\n", ref)
ctx := context.Background()
var bucket *blob.Bucket

if strings.HasPrefix(ref, "s3://") {
// S3 handler contributed by Flatiron (pgunn)
// This relies on the non-GCS-specific blob API and requires that the user:
// A: Have set up AWS credentials in ways gocloud can find them (see the "aws config" command)
// B: Have set the AWS_REGION environment variable (usually to us-east-2)
bucket, err = blob.OpenBucket(ctx, ref)
if err != nil {
dvid.Errorf("Can't open NG precomputed @ %q: %v\n", ref, err)
return nil, false, err
}
pathpart := strings.TrimPrefix(ref, "s3://")
pathpart = strings.SplitN(pathpart, "/", 2)[1] // Remove the bucket name
bucket = blob.PrefixedBucket(bucket, pathpart)

} else if strings.HasPrefix(ref, "vast://") {
// VAST S3-compatible storage
// The ref should be of form "vast://<endpoint>/<bucket>" where
// <endpoint> is the VAST endpoint and <bucket> is the bucket name.
// The following endpoints must be set:
// AWS_REGION: ignored but must be set for this cross-cloud library
// AWS_SHARED_CREDENTIALS_FILE: path to a file with AWS credentials that
// should be in form:
// [default]
// aws_access_key_id = <access key>
// aws_secret_access_key = <secret key>
ref := strings.TrimPrefix(ref, "vast://")
ref_parts := strings.SplitN(ref, "/", 2)
if len(ref_parts) != 2 {
return nil, false, fmt.Errorf("vast ref must be of form 'vast://<endpoint>/<bucket>'")
}
endpoint := ref_parts[0]
bucketname := ref_parts[1]
url := fmt.Sprintf("s3://%s?endpoint=%s&s3ForcePathStyle=true", bucketname, endpoint)
bucket, err = blob.OpenBucket(ctx, url)
if err != nil {
dvid.Errorf("Can't open NG precomputed @ %q: %v\n", ref, err)
return nil, false, err
}

} else {
// In this case default to Google Store authentication as DVID did before
// See https://cloud.google.com/docs/authentication/production
// for more info on alternatives.
creds, err := gcp.DefaultCredentials(ctx)
if err != nil {
return nil, false, err
}

// Create an HTTP client.
// This example uses the default HTTP transport and the credentials
// created above.
client, err := gcp.NewHTTPClient(
gcp.DefaultTransport(),
gcp.CredentialsTokenSource(creds))
if err != nil {
return nil, false, err
}

// Create a *blob.Bucket.
bucket, err = gcsblob.OpenBucket(ctx, client, ref, nil)
if err != nil {
fmt.Printf("Can't open NG precomputed @ %q: %v\n", ref, err)
return nil, false, err
}
bucket, err := storage.OpenBucket(ref)
if err != nil {
return nil, false, err
}

ctx := context.Background()
data, err := bucket.ReadAll(ctx, "info")
if err != nil {
return nil, false, err
Expand Down

0 comments on commit 6c57b38

Please sign in to comment.