Skip to content

Commit

Permalink
Merge pull request #40 from awslabs/pywming-dev
Browse files Browse the repository at this point in the history
refactor cli tools to be pip installable
  • Loading branch information
wleepang authored Jun 12, 2024
2 parents 998816b + 4141b5b commit 82fbd8d
Show file tree
Hide file tree
Showing 9 changed files with 719 additions and 181 deletions.
162 changes: 125 additions & 37 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,18 +1,49 @@
# AWS HealthOmics Tools

Tools for working with the Amazon Omics Service.

## Using the Omics Transfer Manager
SDK and CLI Tools for working with the AWS HealthOmics Service.

- [AWS HealthOmics Tools](#aws-healthomics-tools)
- [Installation](#installation)
- [SDK Tools](#sdk-tools)
- [Omics Transfer Manager](#omics-transfer-manager)
- [Basic Usage](#basic-usage)
- [Download specific files](#download-specific-files)
- [Upload specific files](#upload-specific-files)
- [Subscribe to events](#subscribe-to-events)
- [Threads](#threads)
- [Omics URI Parser](#omics-uri-parser)
- [Readset file URI:](#readset-file-uri)
- [Reference file URI:](#reference-file-uri)
- [CLI Tools](#cli-tools)
- [Omics Rerun](#omics-rerun)
- [List runs from manifest](#list-runs-from-manifest)
- [Rerun a previously-executed run](#rerun-a-previously-executed-run)
- [Omics Run Analyzer](#omics-run-analyzer)
- [List completed runs](#list-completed-runs)
- [Analyze a specific workflow run](#analyze-a-specific-workflow-run)
- [Output workflow run manifest in JSON format](#output-workflow-run-manifest-in-json-format)
- [Security](#security)
- [License](#license)

## Installation
AWS HealthOmics Tools is available through pypi. To install, type:

```bash
pip install amazon-omics-tools
```

### Installation
Installation
Amazon Omics Tools is available through pypi. To install, type:
To install from source:

```python
pip install amazon-omics-tools
```
git clone https://github.com/awslabs/amazon-omics-tools.git
pip install ./amazon-omics-tools
```

### Basic Usage
## SDK Tools

### Omics Transfer Manager

#### Basic Usage
The `TransferManager` class makes it easy to download files for an Omics reference or read set. By default the files are saved to the current directory, or you can specify a custom location with the `directory` parameter.

```python
Expand All @@ -34,7 +65,7 @@ manager.download_reference(REFERENCE_STORE_ID, "<my-reference-id>")
manager.download_read_set(SEQUENCE_STORE_ID, "<my-read-set-id>", "my-sequence-data")
```

### Download specific files
#### Download specific files
Specific files can be downloaded via the `download_reference_file` and `download_read_set_file` methods.
The `client_fileobj` parameter can be either the name of a local file to create for storing the data, or a `TextIO` or `BinaryIO` object that supports write methods.

Expand All @@ -55,7 +86,7 @@ manager.download_read_set_file(
)
```

### Upload specific files
#### Upload specific files
Specific files can be uploaded via the `upload_read_set` method.
The `fileobjs` parameter can be either the name of a local file, or a `TextIO` or `BinaryIO` object that supports read methods.
For paired end reads, you can define `fileobjs` as a list of files.
Expand Down Expand Up @@ -84,7 +115,7 @@ read_set_id = manager.upload_read_set(
)
```

### Subscribe to events
#### Subscribe to events
Transfer events: `on_queued`, `on_progress`, and `on_done` can be observed by defining a subclass of `OmicsTransferSubscriber` and passing in an object which can receive events.

```python
Expand All @@ -99,7 +130,7 @@ class ProgressReporter(OmicsTransferSubscriber):
manager.download_read_set(SEQUENCE_STORE_ID, "<my-read-set-id>", subscribers=[ProgressReporter()])
```

### Threads
#### Threads
Transfer operations use threads to implement concurrency. Thread use can be disabled by setting the `use_threads` attribute to False.

If thread use is disabled, transfer concurrency does not occur. Accordingly, the value of the `max_request_concurrency` attribute is ignored.
Expand All @@ -111,33 +142,37 @@ manager = TransferManager(client, config)
manager.download_read_set(SEQUENCE_STORE_ID, "<my-read-set-id>")
```

## Using the Omics URI Parser
### Basic Usage
### Omics URI Parser

The `OmicsUriParser` class makes it easy to parse omics readset and reference URIs to extract fields relevant for calling
AWS omics APIs.


#### Readset file URI:
Readset file URIs come in the following format:
```

```text
omics://<AWS_ACCOUNT_ID>.storage.<AWS_REGION>.amazonaws.com/<SEQUENCE_STORE_ID>/readSet/<READSET_ID>/<SOURCE1/SOURCE2>
```

For example:
```
```text
omics://123412341234.storage.us-east-1.amazonaws.com/5432154321/readSet/5346184667/source1
omics://123412341234.storage.us-east-1.amazonaws.com/5432154321/readSet/5346184667/source2
```

#### Reference file URI:
Reference file URIs come in the following format:
```
```text
omics://<AWS_ACCOUNT_ID>.storage.<AWS_REGION>.amazonaws.com/<REFERENCE_STORE_ID>/reference/<REFERENCE_ID>/source
```
For example:
```
```text
omics://123412341234.storage.us-east-1.amazonaws.com/5432154321/reference/5346184667/source
```

To handle both Omics URI types, you would use code like the following:

```python
import boto3
from omics.uriparse.uri_parse import OmicsUriParser, OmicsUri
Expand Down Expand Up @@ -169,23 +204,46 @@ manager.download_read_set_file(
)
```

## Using the Omics Rerun tool
### Basic Usage
## CLI Tools
CLI tools are modules in this package that can be invoked from the command line with:

```bash
python -m omics.cli.<TOOL-NAME>
```

### Omics Rerun

The `omics-rerun` tool makes it easy to start a new run execution from a CloudWatch Logs manifest.

For an overview of what it does and available options run:

```bash
python -m omics.cli.rerun -h
```

#### List runs from manifest
The following example lists all workflow run ids which were completed on July 1st (UTC time):
```txt
> omics-rerun -s 2023-07-01T00:00:00 -e 2023-07-02T00:00:00
```bash
python -m omics.cli.rerun -s 2023-07-01T00:00:00 -e 2023-07-02T00:00:00
```

this returns something like:

```text
1234567 (2023-07-01T12:00:00.000)
2345678 (2023-07-01T13:00:00.000)
```

#### Rerun a previously-executed run
To rerun a previously-executed run, specify the run id you would like to rerun:

```txt
> omics-rerun 1234567
```bash
python -m omics.cli.rerun 1234567
```

this returns something like:

```text
StartRun request:
{
"workflowId": "4974161",
Expand All @@ -207,8 +265,14 @@ StartRun response:
```

It is possible to override a request parameter from the original run. The following example tags the new run, which is particularly useful as tags are not propagated from the original run.
```txt
> omics-rerun 1234567 --tag=myKey=myValue

```bash
python -m omics.cli.rerun 1234567 --tag=myKey=myValue

```

this returns something like:
```text
StartRun request:
{
"workflowId": "4974161",
Expand All @@ -235,8 +299,12 @@ StartRun response:
```

Before submitting a rerun request, it is possible to dry-run to view the new StartRun request:
```txt
> omics-rerun -d 1234567
```bash
python -m omics.cli.rerun -d 1234567
```

this returns something like:
```text
StartRun request:
{
"workflowId": "4974161",
Expand All @@ -250,22 +318,37 @@ StartRun request:
}
```

## Using the Omics Run Analyzer tool
### Basic Usage
### Omics Run Analyzer
The `omics-run-analyzer` tool retrieves a workflow run manifest from CloudWatchLogs and generates statistics for the run, including CPU and memory utilization for each workflow task.

For an overview of what it does and available options run:

```bash
python -m omics.cli.run_analyzer -h
```

#### List completed runs
The following example lists all workflow runs completed in the past 5 days:
```txt
> omics-run-analyzer -t5d
```bash
python -m omics.cli.run_analyzer -t5d
```

this returns something like:

```text
Workflow run IDs (<completionTime> <UUID>):
1234567 (2024-02-01T12:00:00 12345678-1234-5678-9abc-123456789012)
2345678 (2024-02-03T13:00:00 12345678-1234-5678-9abc-123456789012)
```

#### Analyze a specific workflow run
```txt
> omics-run-analyzer 1234567 -o run-1234567.csv
```bash
python -m omics.cli.run_analyzer 1234567 -o run-1234567.csv
```

this returns something like:

```text
omics-run-analyzer: wrote run-1234567.csv
```

Expand Down Expand Up @@ -296,8 +379,13 @@ The CSV output by the command above includes the following columns:
* __storageAverageGiB__ : Average gibibytes of storage used by the workflow run

#### Output workflow run manifest in JSON format
```txt
> omics-run-analyzer 1234567 -s -o run-1234567.json

```bash
python -m omics.cli.run_analyzer 1234567 -s -o run-1234567.json
```

this returns something like:
```text
omics-run-analyzer: wrote run-1234567.json
```

Expand Down
Empty file added omics/cli/__init__.py
Empty file.
Empty file added omics/cli/rerun/__init__.py
Empty file.
30 changes: 15 additions & 15 deletions omics/rerun/omics-rerun → omics/cli/rerun/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,9 @@ def stream_to_run(strm):
m = re.match(r"^manifest/run/(\d+)/[a-f0-9-]+$", strm["logStreamName"])
if not m:
return None
creation_time = datetime.datetime.fromtimestamp(
strm["creationTime"] / 1000.0).isoformat(timespec="milliseconds")
creation_time = datetime.datetime.fromtimestamp(strm["creationTime"] / 1000.0).isoformat(
timespec="milliseconds"
)
return {
"id": m.group(1),
"creationTime": creation_time,
Expand All @@ -88,10 +89,12 @@ def stream_to_run(strm):

def get_streams(logs, rqst, opts={}):
"""Get matching CloudWatch Log streams"""
start_time = dateutil.parser.parse(
opts["--start"]).timestamp() * 1000.0 if opts.get("--start") else None
end_time = dateutil.parser.parse(
opts["--end"]).timestamp() * 1000.0 if opts.get("--end") else None
start_time = (
dateutil.parser.parse(opts["--start"]).timestamp() * 1000.0 if opts.get("--start") else None
)
end_time = (
dateutil.parser.parse(opts["--end"]).timestamp() * 1000.0 if opts.get("--end") else None
)
streams = []
while True:
try:
Expand Down Expand Up @@ -132,7 +135,7 @@ def get_runs(logs, runs, opts):
# Get runs in time range
rqst = {
"logGroupName": "/aws/omics/WorkflowLog",
"logStreamNamePrefix": f"manifest/run/",
"logStreamNamePrefix": "manifest/run/",
}
streams.extend(get_streams(logs, rqst, opts))
runs = [stream_to_run(s) for s in streams]
Expand Down Expand Up @@ -163,6 +166,7 @@ def get_run_resources(logs, run):

def start_run_request(run, opts={}):
"""Build StartRun request"""

def set_param(rqst, key, key0, val=None):
if not val and opts and key0:
val = opts[key0]
Expand Down Expand Up @@ -240,8 +244,7 @@ def set_param(rqst, key, key0, val=None):
out.write(f"{r['id']} ({r['creationTime']})\n")
else:
resources = get_run_resources(logs, runs[0])
run = [r for r in resources if r["arn"].endswith(
f"run/{runs[0]['id']}")]
run = [r for r in resources if r["arn"].endswith(f"run/{runs[0]['id']}")]
run = run[0] if run else None
if not resources:
die("no workflow run resources")
Expand All @@ -254,20 +257,17 @@ def set_param(rqst, key, key0, val=None):
rqst0 = start_run_request(run)
rqst = start_run_request(run, opts)
if rqst != rqst0:
out.write(
f"Original request:\n{json.dumps(rqst0, indent=2)}\n")
out.write(f"Original request:\n{json.dumps(rqst0, indent=2)}\n")
out.write(f"StartRun request:\n{json.dumps(rqst, indent=2)}\n")
if not opts["--dry-run"]:
try:
omics = boto3.client("omics")
resp = omics.start_run(**rqst)
except Exception as e:
die(f"StartRun failed: {e}")
del resp["ResponseMetadata"]
out.write(
f"StartRun response:\n{json.dumps(resp, indent=2)}\n")
del resp["ResponseMetadata"] # type: ignore
out.write(f"StartRun response:\n{json.dumps(resp, indent=2)}\n")

if opts["--out"]:
out.close()
sys.stderr.write(f"{exename}: wrote {opts['--out']}\n")

Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,11 @@
OMICS_SERVICE_CODE = "AmazonOmics"
PRICING_AWS_REGION = "us-east-1" # Pricing service endpoint
SECS_PER_HOUR = 3600.0
STORAGE_TYPE_DYNAMIC_RUN_STORAGE="DYNAMIC"
STORAGE_TYPE_STATIC_RUN_STORAGE="STATIC"
PRICE_RESOURCE_TYPE_DYNAMIC_RUN_STORAGE="Dynamic Run Storage"
PRICE_RESOURCE_TYPE_STATIC_RUN_STORAGE="Run Storage"
STORAGE_TYPE_DYNAMIC_RUN_STORAGE = "DYNAMIC"
STORAGE_TYPE_STATIC_RUN_STORAGE = "STATIC"
PRICE_RESOURCE_TYPE_DYNAMIC_RUN_STORAGE = "Dynamic Run Storage"
PRICE_RESOURCE_TYPE_STATIC_RUN_STORAGE = "Run Storage"


def die(msg):
"""Show error message and terminate"""
Expand Down Expand Up @@ -323,7 +324,7 @@ def add_metrics(res, resources, pricing):
storage_type = res.get("storageType")

if rtype == "run":
# Get capacity requested (static), capacity max. used (dynamic) and
# Get capacity requested (static), capacity max. used (dynamic) and
# charged storage (the requested capacity for static or average used for dynamic)
if storage_type == STORAGE_TYPE_STATIC_RUN_STORAGE:
price_resource_type = PRICE_RESOURCE_TYPE_STATIC_RUN_STORAGE
Expand All @@ -340,7 +341,7 @@ def add_metrics(res, resources, pricing):
price = get_pricing(pricing, price_resource_type, region, gib_hrs)
if price:
metrics["estimatedUSD"] = price

# Get price for optimal static storage
if store_max:
capacity = get_static_storage_gib(store_max)
Expand Down
Loading

0 comments on commit 82fbd8d

Please sign in to comment.