diff --git a/datatype/keyvalue/keyvalue.go b/datatype/keyvalue/keyvalue.go index d50b6ee5..c3c89006 100644 --- a/datatype/keyvalue/keyvalue.go +++ b/datatype/keyvalue/keyvalue.go @@ -71,6 +71,10 @@ $ dvid -stdin node put < data Puts stdin data into the keyvalue data instance under the given key. +$ dvid node dump-cloud + + Dumps all key-value pairs in the keyvalue data instance to the cloud storage bucket. + ------------------ @@ -575,40 +579,6 @@ func (d *Data) DeleteData(ctx storage.Context, keyStr string) error { return db.Delete(ctx, tk) } -// put handles a PUT command-line request. -func (d *Data) put(cmd datastore.Request, reply *datastore.Response) error { - if len(cmd.Command) < 5 { - return fmt.Errorf("the key name must be specified after 'put'") - } - if len(cmd.Input) == 0 { - return fmt.Errorf("no data was passed into standard input") - } - var uuidStr, dataName, cmdStr, keyStr string - cmd.CommandArgs(1, &uuidStr, &dataName, &cmdStr, &keyStr) - - _, versionID, err := datastore.MatchingUUID(uuidStr) - if err != nil { - return err - } - - // Store data - if !d.Versioned() { - // Map everything to root version. - versionID, err = datastore.GetRepoRootVersion(versionID) - if err != nil { - return err - } - } - ctx := datastore.NewVersionedCtx(d, versionID) - if err = d.PutData(ctx, keyStr, cmd.Input); err != nil { - return fmt.Errorf("error on put to key %q for keyvalue %q: %v", keyStr, d.DataName(), err) - } - - reply.Output = []byte(fmt.Sprintf("Put %d bytes into key %q for keyvalue %q, uuid %s\n", - len(cmd.Input), keyStr, d.DataName(), uuidStr)) - return nil -} - // JSONString returns the JSON for this Data's configuration func (d *Data) JSONString() (jsonStr string, err error) { m, err := json.Marshal(d) @@ -629,6 +599,8 @@ func (d *Data) DoRPC(request datastore.Request, reply *datastore.Response) error switch request.TypeCommand() { case "put": return d.put(request, reply) + case "dump-cloud": + return d.dumpCloud(request, reply) default: return fmt.Errorf("unknown command. Data '%s' [%s] does not support '%s' command", d.DataName(), d.TypeName(), request.TypeCommand()) diff --git a/datatype/keyvalue/rpc.go b/datatype/keyvalue/rpc.go new file mode 100644 index 00000000..89e922a3 --- /dev/null +++ b/datatype/keyvalue/rpc.go @@ -0,0 +1,77 @@ +package keyvalue + +import ( + "fmt" + + "github.com/janelia-flyem/dvid/datastore" + + _ "gocloud.dev/blob/gcsblob" + _ "gocloud.dev/blob/s3blob" +) + +// put handles a PUT command-line request. +func (d *Data) put(cmd datastore.Request, reply *datastore.Response) error { + if len(cmd.Command) < 5 { + return fmt.Errorf("the key name must be specified after 'put'") + } + if len(cmd.Input) == 0 { + return fmt.Errorf("no data was passed into standard input") + } + var uuidStr, dataName, cmdStr, keyStr string + cmd.CommandArgs(1, &uuidStr, &dataName, &cmdStr, &keyStr) + + _, versionID, err := datastore.MatchingUUID(uuidStr) + if err != nil { + return err + } + + // Store data + if !d.Versioned() { + // Map everything to root version. + versionID, err = datastore.GetRepoRootVersion(versionID) + if err != nil { + return err + } + } + ctx := datastore.NewVersionedCtx(d, versionID) + if err = d.PutData(ctx, keyStr, cmd.Input); err != nil { + return fmt.Errorf("error on put to key %q for keyvalue %q: %v", keyStr, d.DataName(), err) + } + + reply.Output = []byte(fmt.Sprintf("Put %d bytes into key %q for keyvalue %q, uuid %s\n", + len(cmd.Input), keyStr, d.DataName(), uuidStr)) + return nil +} + +// dumpCloud stores all key-value pairs in cloud storage. +func (d *Data) dumpCloud(cmd datastore.Request, reply *datastore.Response) error { + if len(cmd.Command) < 5 { + return fmt.Errorf("dump-cloud must be followed by ") + } + var uuidStr, dataName, cmdStr, refStr string + cmd.CommandArgs(1, &uuidStr, &dataName, &cmdStr, &refStr) + + _, versionID, err := datastore.MatchingUUID(uuidStr) + if err != nil { + return err + } + + if !d.Versioned() { + // Map everything to root version. + versionID, err = datastore.GetRepoRootVersion(versionID) + if err != nil { + return err + } + } + // ctx := datastore.NewVersionedCtx(d, versionID) + _ = datastore.NewVersionedCtx(d, versionID) + + // TODO: Dump to cloud store + + reply.Output = []byte(fmt.Sprintf("Dumped all key-values to cloud %q for keyvalue %q, uuid %s\n", + refStr, d.DataName(), uuidStr)) + return nil +} + +// TODO: Create transfer function using channels to get decent parallel write to cloud, assume +// getting from disk will be much faster than writing to cloud. diff --git a/storage/cloud.go b/storage/cloud.go new file mode 100644 index 00000000..f1c84c76 --- /dev/null +++ b/storage/cloud.go @@ -0,0 +1,90 @@ +package storage + +import ( + "context" + "fmt" + "strings" + + "github.com/janelia-flyem/dvid/dvid" + + "gocloud.dev/blob" + "gocloud.dev/blob/gcsblob" + "gocloud.dev/gcp" +) + +// OpenBucket returns a blob.Bucket for the given reference. +// The reference should be of the form: +// +// gcs:// +// s3:// +// vast:/// +func OpenBucket(ref string) (bucket *blob.Bucket, err error) { + ctx := context.Background() + + if strings.HasPrefix(ref, "s3://") { + // S3 handler contributed by Flatiron (pgunn) + // This relies on the non-GCS-specific blob API and requires that the user: + // A: Have set up AWS credentials in ways gocloud can find them (see the "aws config" command) + // B: Have set the AWS_REGION environment variable (usually to us-east-2) + bucket, err = blob.OpenBucket(ctx, ref) + if err != nil { + dvid.Errorf("Can't open bucket reference @ %q: %v\n", ref, err) + return nil, err + } + pathpart := strings.TrimPrefix(ref, "s3://") + pathpart = strings.SplitN(pathpart, "/", 2)[1] // Remove the bucket name + bucket = blob.PrefixedBucket(bucket, pathpart) + + } else if strings.HasPrefix(ref, "vast://") { + // VAST S3-compatible storage + // The ref should be of form "vast:///" where + // is the VAST endpoint and is the bucket name. + // The following endpoints must be set: + // AWS_REGION: ignored but must be set for this cross-cloud library + // AWS_SHARED_CREDENTIALS_FILE: path to a file with AWS credentials that + // should be in form: + // [default] + // aws_access_key_id = + // aws_secret_access_key = + ref := strings.TrimPrefix(ref, "vast://") + ref_parts := strings.SplitN(ref, "/", 2) + if len(ref_parts) != 2 { + return nil, fmt.Errorf("vast ref must be of form 'vast:///'") + } + endpoint := ref_parts[0] + bucketname := ref_parts[1] + url := fmt.Sprintf("s3://%s?endpoint=%s&s3ForcePathStyle=true", bucketname, endpoint) + bucket, err = blob.OpenBucket(ctx, url) + if err != nil { + dvid.Errorf("Can't open bucket reference @ %q: %v\n", ref, err) + return nil, err + } + + } else { + // In this case default to Google Store authentication as DVID did before + // See https://cloud.google.com/docs/authentication/production + // for more info on alternatives. + creds, err := gcp.DefaultCredentials(ctx) + if err != nil { + return nil, err + } + + // Create an HTTP client. + // This example uses the default HTTP transport and the credentials + // created above. + client, err := gcp.NewHTTPClient( + gcp.DefaultTransport(), + gcp.CredentialsTokenSource(creds)) + if err != nil { + return nil, err + } + + // Create a *blob.Bucket. + bucket, err = gcsblob.OpenBucket(ctx, client, ref, nil) + if err != nil { + fmt.Printf("Can't open bucket reference @ %q: %v\n", ref, err) + return nil, err + } + } + return bucket, nil +} diff --git a/storage/ngprecomputed/ngprecomputed.go b/storage/ngprecomputed/ngprecomputed.go index de98ecde..7b3fa823 100644 --- a/storage/ngprecomputed/ngprecomputed.go +++ b/storage/ngprecomputed/ngprecomputed.go @@ -16,11 +16,7 @@ import ( "github.com/blang/semver" "gocloud.dev/blob" - "gocloud.dev/blob/gcsblob" - _ "gocloud.dev/blob/gcsblob" - _ "gocloud.dev/blob/s3blob" "gocloud.dev/gcerrors" - "gocloud.dev/gcp" "github.com/janelia-flyem/dvid/dvid" "github.com/janelia-flyem/dvid/storage" @@ -145,75 +141,12 @@ func (e Engine) newStore(config dvid.StoreConfig) (*ngStore, bool, error) { // } dvid.Infof("Trying to open NG-Precomputed store @ %q ...\n", ref) - ctx := context.Background() - var bucket *blob.Bucket - - if strings.HasPrefix(ref, "s3://") { - // S3 handler contributed by Flatiron (pgunn) - // This relies on the non-GCS-specific blob API and requires that the user: - // A: Have set up AWS credentials in ways gocloud can find them (see the "aws config" command) - // B: Have set the AWS_REGION environment variable (usually to us-east-2) - bucket, err = blob.OpenBucket(ctx, ref) - if err != nil { - dvid.Errorf("Can't open NG precomputed @ %q: %v\n", ref, err) - return nil, false, err - } - pathpart := strings.TrimPrefix(ref, "s3://") - pathpart = strings.SplitN(pathpart, "/", 2)[1] // Remove the bucket name - bucket = blob.PrefixedBucket(bucket, pathpart) - - } else if strings.HasPrefix(ref, "vast://") { - // VAST S3-compatible storage - // The ref should be of form "vast:///" where - // is the VAST endpoint and is the bucket name. - // The following endpoints must be set: - // AWS_REGION: ignored but must be set for this cross-cloud library - // AWS_SHARED_CREDENTIALS_FILE: path to a file with AWS credentials that - // should be in form: - // [default] - // aws_access_key_id = - // aws_secret_access_key = - ref := strings.TrimPrefix(ref, "vast://") - ref_parts := strings.SplitN(ref, "/", 2) - if len(ref_parts) != 2 { - return nil, false, fmt.Errorf("vast ref must be of form 'vast:///'") - } - endpoint := ref_parts[0] - bucketname := ref_parts[1] - url := fmt.Sprintf("s3://%s?endpoint=%s&s3ForcePathStyle=true", bucketname, endpoint) - bucket, err = blob.OpenBucket(ctx, url) - if err != nil { - dvid.Errorf("Can't open NG precomputed @ %q: %v\n", ref, err) - return nil, false, err - } - - } else { - // In this case default to Google Store authentication as DVID did before - // See https://cloud.google.com/docs/authentication/production - // for more info on alternatives. - creds, err := gcp.DefaultCredentials(ctx) - if err != nil { - return nil, false, err - } - - // Create an HTTP client. - // This example uses the default HTTP transport and the credentials - // created above. - client, err := gcp.NewHTTPClient( - gcp.DefaultTransport(), - gcp.CredentialsTokenSource(creds)) - if err != nil { - return nil, false, err - } - - // Create a *blob.Bucket. - bucket, err = gcsblob.OpenBucket(ctx, client, ref, nil) - if err != nil { - fmt.Printf("Can't open NG precomputed @ %q: %v\n", ref, err) - return nil, false, err - } + bucket, err := storage.OpenBucket(ref) + if err != nil { + return nil, false, err } + ctx := context.Background() data, err := bucket.ReadAll(ctx, "info") if err != nil { return nil, false, err