Skip to content

Commit

Permalink
v2beta release, using google life-sciences api (#107)
Browse files Browse the repository at this point in the history
Update pipelines-tool to use v2beta API

The v2beta Life Sciences API supersedes the v2alpha1 Genomics API.
The largest change is the introduction of a Location where metadata
is stored, so each request must now specify a cloud region in addition
to a project.
  • Loading branch information
alborzml authored Jan 16, 2020
1 parent e123f24 commit 41825be
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 52 deletions.
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
# Google Genomics Pipelines Tools
# Google Lifesciences Pipelines Tools
[![Build Status](https://travis-ci.org/googlegenomics/pipelines-tools.svg?branch=master)](https://travis-ci.org/googlegenomics/pipelines-tools)

This repository contains various tools that are useful when running pipelines
with the [Google Genomics API][1].
with the [Google Lifesciences API][1].

# Quick Start Using Cloud Shell

1. Enable the Genomics API and the Compute Engine API in a new or existing
1. Enable the Lifesciences API and the Compute Engine API in a new or existing
Google Cloud project.
2. Start a [Cloud Shell][cloud-shell] inside your project.
3. Inside the Cloud Shell, run the command
Expand Down Expand Up @@ -97,9 +97,9 @@ output.

Please report problems using the issue tracker.

[1]: https://cloud.google.com/genomics
[1]: https://cloud.google.com/life-sciences/
[2]: https://golang.org/
[3]: https://github.com/googlegenomics/pipelines-tools/blob/master/pipelines/internal/commands/run/run.go#L18
[cloud-shell]: https://cloud.google.com/shell/docs/quickstart
[api-reference]: https://cloud.google.com/genomics/reference/rest/v2alpha1/pipelines/run
[api-reference]: https://cloud.google.com/life-sciences/docs/reference/rest/v2beta/projects.locations.pipelines/run
[gcs-fuse]: https://cloud.google.com/storage/docs/gcs-fuse
8 changes: 4 additions & 4 deletions pipelines/internal/commands/cancel/cancel.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,18 @@ import (
"net/http"

"github.com/googlegenomics/pipelines-tools/pipelines/internal/common"
genomics "google.golang.org/api/genomics/v2alpha1"
"google.golang.org/api/googleapi"
genomics "google.golang.org/api/lifesciences/v2beta"
)

func Invoke(ctx context.Context, service *genomics.Service, project string, arguments []string) error {
func Invoke(ctx context.Context, service *genomics.Service, project, location string, arguments []string) error {
if len(arguments) < 1 {
return errors.New("missing operation name")
}

name := common.ExpandOperationName(project, arguments[0])
name := common.ExpandOperationName(project, location, arguments[0])
req := &genomics.CancelOperationRequest{}
if _, err := service.Projects.Operations.Cancel(name, req).Context(ctx).Do(); err != nil {
if _, err := service.Projects.Locations.Operations.Cancel(name, req).Context(ctx).Do(); err != nil {
if err, ok := err.(*googleapi.Error); ok && err.Code == http.StatusNotFound {
return fmt.Errorf("operation %q not found", name)
}
Expand Down
9 changes: 5 additions & 4 deletions pipelines/internal/commands/export/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ import (
"errors"
"flag"
"fmt"
"path"
"time"

"cloud.google.com/go/bigquery"
"cloud.google.com/go/civil"
genomics "google.golang.org/api/genomics/v2alpha1"
genomics "google.golang.org/api/lifesciences/v2beta"
)

var (
Expand Down Expand Up @@ -72,15 +73,15 @@ type label struct {
Key, Value string
}

func Invoke(ctx context.Context, service *genomics.Service, project string, arguments []string) error {
func Invoke(ctx context.Context, service *genomics.Service, project, location string, arguments []string) error {
flags.Parse(arguments)

if *datasetName == "" || *tableName == "" {
return errors.New("dataset and table are required")
}

path := fmt.Sprintf("projects/%s/operations", project)
call := service.Projects.Operations.List(path).Context(ctx)
p := path.Join("projects", project, "locations", location)
call := service.Projects.Locations.Operations.List(p).Context(ctx)
call.PageSize(256)

bq, err := bigquery.NewClient(ctx, project)
Expand Down
9 changes: 5 additions & 4 deletions pipelines/internal/commands/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ import (
"context"
"flag"
"fmt"
"path"
"strings"

genomics "google.golang.org/api/genomics/v2alpha1"
genomics "google.golang.org/api/lifesciences/v2beta"
)

var (
Expand All @@ -32,11 +33,11 @@ var (
all = flags.Bool("all", false, "show all operations (when false, show only running operations)")
)

func Invoke(ctx context.Context, service *genomics.Service, project string, arguments []string) error {
func Invoke(ctx context.Context, service *genomics.Service, project, location string, arguments []string) error {
flags.Parse(arguments)

path := fmt.Sprintf("projects/%s/operations", project)
call := service.Projects.Operations.List(path).Context(ctx)
p := path.Join("projects", project, "locations", location)
call := service.Projects.Locations.Operations.List(p).Context(ctx)

if !*all {
*filter = strings.Join([]string{*filter, "done=false"}, " ")
Expand Down
55 changes: 34 additions & 21 deletions pipelines/internal/commands/run/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ import (
"github.com/googlegenomics/pipelines-tools/pipelines/internal/common"
"golang.org/x/oauth2/google"
compute "google.golang.org/api/compute/v1"
genomics "google.golang.org/api/genomics/v2alpha1"
"google.golang.org/api/googleapi"
genomics "google.golang.org/api/lifesciences/v2beta"
)

var (
Expand Down Expand Up @@ -181,7 +181,7 @@ func init() {
flags.Var(&common.MapFlagValue{vmLabels}, "vm-labels", "label names and values to apply to the virtual machine")
}

func Invoke(ctx context.Context, service *genomics.Service, project string, arguments []string) error {
func Invoke(ctx context.Context, service *genomics.Service, project, location string, arguments []string) error {
filenames := common.ParseFlags(flags, arguments)

var filename string
Expand All @@ -207,18 +207,20 @@ func Invoke(ctx context.Context, service *genomics.Service, project string, argu
return nil
}

return runPipeline(ctx, service, req)
return runPipeline(ctx, service, req, project, location)
}

func runPipeline(ctx context.Context, service *genomics.Service, req *genomics.RunPipelineRequest) error {
func runPipeline(ctx context.Context, service *genomics.Service, req *genomics.RunPipelineRequest, project, location string) error {
abort := make(chan os.Signal, 1)
signal.Notify(abort, os.Interrupt)

attempt := uint(1)
for {
req.Pipeline.Resources.VirtualMachine.Preemptible = (attempt <= *pvmAttempts)

lro, err := service.Pipelines.Run(req).Context(ctx).Do()
parent := path.Join("projects", project, "locations", location)
lro, err := service.Projects.Locations.Pipelines.Run(parent, req).Context(ctx).Do()

if err != nil {
if err, ok := err.(*googleapi.Error); ok && err.Message != "" {
return fmt.Errorf("starting pipeline: %q: %q", err.Message, err.Body)
Expand All @@ -237,7 +239,7 @@ func runPipeline(ctx context.Context, service *genomics.Service, req *genomics.R
return nil
}

if err := watch.Invoke(ctx, service, req.Pipeline.Resources.ProjectId, []string{lro.Name}); err != nil {
if err := watch.Invoke(ctx, service, project, location, []string{lro.Name}); err != nil {
if err, ok := err.(common.PipelineExecutionError); ok && err.IsRetriable() {
if attempt < *pvmAttempts+*attempts {
attempt++
Expand Down Expand Up @@ -324,14 +326,14 @@ func buildRequest(filename, project string) (*genomics.RunPipelineRequest, error

if *output != "" {
action := gsutil("cp", "/google/logs/output", *output)
action.Flags = []string{"ALWAYS_RUN"}
action.AlwaysRun = true
delocalizers = append(delocalizers, action)
}

var actions []*genomics.Action
if *outputInterval != 0 && *output != "" {
action := bash(fmt.Sprintf("while true; do sleep %.0f; gsutil -q cp /google/logs/output %s; done", (*outputInterval).Seconds(), *output))
action.Flags = []string{"RUN_IN_BACKGROUND"}
action.RunInBackground = true
actions = append(actions, action)
}

Expand Down Expand Up @@ -368,7 +370,7 @@ func buildRequest(filename, project string) (*genomics.RunPipelineRequest, error
}

if *network != "" {
vm.Network.Name = *network
vm.Network.Network = *network
}
if *subnetwork != "" {
vm.Network.Subnetwork = *subnetwork
Expand All @@ -382,7 +384,6 @@ func buildRequest(filename, project string) (*genomics.RunPipelineRequest, error
}

resources := &genomics.Resources{
ProjectId: project,
VirtualMachine: vm,
}
if *regions != "" && *zones != "" {
Expand Down Expand Up @@ -494,12 +495,23 @@ func parse(line string) (*genomics.Action, error) {
var action genomics.Action

options := make(map[string]string)
flags := map[string]*bool{
"IGNORE_EXIT_STATUS": &action.IgnoreExitStatus,
"RUN_IN_BACKGROUND": &action.RunInBackground,
"ALWAYS_RUN": &action.AlwaysRun,
"ENABLE_FUSE": &action.EnableFuse,
"PUBLISH_EXPOSED_PORTS": &action.PublishExposedPorts,
"DISABLE_IMAGE_PREFETCH": &action.DisableImagePrefetch,
"DISABLE_STANDARD_ERROR_CAPTURE": &action.DisableStandardErrorCapture,
}
if n := strings.Index(line, "#"); n >= 0 {
for _, option := range strings.Fields(strings.TrimSpace(line[n+1:])) {
if n := strings.Index(option, "="); n >= 0 {
options[option[:n]] = option[n+1:]
} else if p, ok := flags[strings.ToUpper(option)]; ok {
*p = true
} else {
action.Flags = append(action.Flags, strings.ToUpper(option))
return nil, fmt.Errorf("unknown action flag %q specified", option)
}
}
line = line[:n]
Expand All @@ -508,7 +520,7 @@ func parse(line string) (*genomics.Action, error) {
commands := strings.Fields(strings.TrimSpace(line))
if len(commands) > 0 {
if commands[len(commands)-1] == "&" {
action.Flags = append(action.Flags, "RUN_IN_BACKGROUND")
action.RunInBackground = true
commands = commands[:len(commands)-1]
}
action.Commands = []string{"-c", strings.Join(commands, " ")}
Expand Down Expand Up @@ -726,7 +738,7 @@ func cancelOnInterrupt(ctx context.Context, service *genomics.Service, name stri
<-abort
fmt.Println("Cancelling operation...")
req := &genomics.CancelOperationRequest{}
if _, err := service.Projects.Operations.Cancel(name, req).Context(ctx).Do(); err != nil {
if _, err := service.Projects.Locations.Operations.Cancel(name, req).Context(ctx).Do(); err != nil {
fmt.Printf("Failed to cancel operation: %v\n", err)
}
}()
Expand Down Expand Up @@ -771,10 +783,11 @@ func gcsFuse(buckets map[string]string) []*genomics.Action {
var actions []*genomics.Action
for bucket, path := range buckets {
actions = append(actions, &genomics.Action{
ImageUri: "gcr.io/cloud-genomics-pipelines/gcsfuse",
Commands: []string{"--implicit-dirs", "--foreground", bucket, path},
Flags: []string{"ENABLE_FUSE", "RUN_IN_BACKGROUND"},
Mounts: []*genomics.Mount{googleRoot},
ImageUri: "gcr.io/cloud-genomics-pipelines/gcsfuse",
Commands: []string{"--implicit-dirs", "--foreground", bucket, path},
EnableFuse: true,
RunInBackground: true,
Mounts: []*genomics.Mount{googleRoot},
})
actions = append(actions, &genomics.Action{
ImageUri: "gcr.io/cloud-genomics-pipelines/gcsfuse",
Expand All @@ -787,9 +800,9 @@ func gcsFuse(buckets map[string]string) []*genomics.Action {

func sshDebug(project string) *genomics.Action {
return &genomics.Action{
ImageUri: "gcr.io/cloud-genomics-pipelines/tools",
Entrypoint: "ssh-server",
PortMappings: map[string]int64{"22": 22},
Flags: []string{"RUN_IN_BACKGROUND"},
ImageUri: "gcr.io/cloud-genomics-pipelines/tools",
Entrypoint: "ssh-server",
PortMappings: map[string]int64{"22": 22},
RunInBackground: true,
}
}
15 changes: 10 additions & 5 deletions pipelines/internal/commands/watch/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"time"

"github.com/googlegenomics/pipelines-tools/pipelines/internal/common"
genomics "google.golang.org/api/genomics/v2alpha1"
genomics "google.golang.org/api/lifesciences/v2beta"
)

var (
Expand All @@ -34,13 +34,13 @@ var (
details = flags.Bool("details", false, "show event details")
)

func Invoke(ctx context.Context, service *genomics.Service, project string, arguments []string) error {
func Invoke(ctx context.Context, service *genomics.Service, project, location string, arguments []string) error {
names := common.ParseFlags(flags, arguments)
if len(names) < 1 {
return errors.New("missing operation name")
}

name := common.ExpandOperationName(project, names[0])
name := common.ExpandOperationName(project, location, names[0])
result, err := watch(ctx, service, name)
if err != nil {
return fmt.Errorf("watching pipeline: %v", err)
Expand All @@ -58,8 +58,9 @@ func watch(ctx context.Context, service *genomics.Service, name string) (interfa
var events []*genomics.Event
const initialDelay = 5 * time.Second
delay := initialDelay

for {
lro, err := service.Projects.Operations.Get(name).Context(ctx).Do()
lro, err := service.Projects.Locations.Operations.Get(name).Context(ctx).Do()
if err != nil {
return nil, fmt.Errorf("getting operation status: %v", err)
}
Expand All @@ -84,7 +85,11 @@ func watch(ctx context.Context, service *genomics.Service, name string) (interfa
fmt.Println(timestamp.Format("15:04:05"), metadata.Events[i].Description)

if *details {
fmt.Println(string(metadata.Events[i].Details))
encoded, err := json.MarshalIndent(metadata.Events[i], "", " ")
if err != nil {
return nil, fmt.Errorf("encoding event: %v", err)
}
fmt.Println(string(encoded))
}
}
events = metadata.Events
Expand Down
13 changes: 8 additions & 5 deletions pipelines/internal/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,19 @@ import (
"path"
"strings"

genomics "google.golang.org/api/genomics/v2alpha1"
genomics "google.golang.org/api/lifesciences/v2beta"
"google.golang.org/genproto/googleapis/rpc/code"
)

// ExpandOperationName adds the project and operations prefixes to name (if
// ExpandOperationName adds the project, location and operations prefixes to name (if
// they are not already present).
func ExpandOperationName(project, name string) string {
func ExpandOperationName(project, location, name string) string {
if !strings.HasPrefix(name, "projects/") {
if !strings.HasPrefix(name, "operations/") {
name = path.Join("operations/", name)
if !strings.HasPrefix(name, "locations/") {
if !strings.HasPrefix(name, "operations/") {
name = path.Join("operations/", name)
}
name = path.Join("locations", location, name)
}
name = path.Join("projects", project, name)
}
Expand Down
9 changes: 5 additions & 4 deletions pipelines/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,15 @@ import (

"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
genomics "google.golang.org/api/genomics/v2alpha1"
genomics "google.golang.org/api/lifesciences/v2beta"
)

var (
project = flag.String("project", defaultProject(), "the cloud project name")
location = flag.String("location", "us-central1", "Google cloud location to store the metadata for the operations")
basePath = flag.String("api", "", "the API base to use")

commands = map[string]func(context.Context, *genomics.Service, string, []string) error{
commands = map[string]func(context.Context, *genomics.Service, string, string, []string) error{
"run": run.Invoke,
"cancel": cancel.Invoke,
"query": query.Invoke,
Expand Down Expand Up @@ -76,7 +77,7 @@ func main() {
exitf("Failed to create service: %v", err)
}

if err := invoke(ctx, service, *project, flag.Args()[1:]); err != nil {
if err := invoke(ctx, service, *project, *location, flag.Args()[1:]); err != nil {
exitf("%q: %v", command, err)
}
}
Expand All @@ -98,7 +99,7 @@ func newService(ctx context.Context, basePath string) (*genomics.Service, error)

ctx = context.WithValue(ctx, oauth2.HTTPClient, &http.Client{Transport: &transport})

client, err := google.DefaultClient(ctx, genomics.GenomicsScope)
client, err := google.DefaultClient(ctx, genomics.CloudPlatformScope)
if err != nil {
return nil, fmt.Errorf("creating authenticated client: %v", err)
}
Expand Down

0 comments on commit 41825be

Please sign in to comment.