Skip to content

Commit

Permalink
feat: add cache download sub command
Browse files Browse the repository at this point in the history
  • Loading branch information
Ryan Routsong committed Nov 29, 2023
1 parent 8427060 commit 931fe21
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 5 deletions.
7 changes: 7 additions & 0 deletions config/remote.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"bcl2fastq": "docker://umccr/bcl2fastq:latest",
"weave": "docker://rroutsong/weave_ngsqc:0.0.1",
"kraken": "https://genome-idx.s3.amazonaws.com/kraken/k2_pluspfp_16gb_20231009.tar.gz",
"kaiju": "https://kaiju-idx.s3.eu-central-1.amazonaws.com/2023/kaiju_db_nr_euk_2023-05-10.tgz",
"fastq_screen": "filelist://www.bioinformatics.babraham.ac.uk/projects/fastq_screen/genome_locations.txt"
}
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ requests
terminaltables
pyyaml
tabulate
progressbar
100 changes: 100 additions & 0 deletions scripts/cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# ~~~~~~~~~~~~~~~
# Miscellaneous utility functions for caching
# pipeline resources
# ~~~~~~~~~~~~~~~
import subprocess
import json
import urllib.request
import progressbar
from argparse import ArgumentTypeError
from pathlib import Path
from urllib.parse import urlparse

from .config import remote_resource_confg
from .utils import esc_colors


parse_uri = lambda uri: (str(uri).split('://')[0], str(uri).split('://')[1]) if '://' in uri else None
info_download = lambda msg: print(esc_colors.OKGREEN + msg + esc_colors.ENDC)


class DownloadProgressBar():
def __init__(self):
self.pbar = None

def __call__(self, block_num, block_size, total_size):
if not self.pbar:
self.pbar=progressbar.ProgressBar(maxval=total_size)
self.pbar.start()

downloaded = block_num * block_size
if downloaded < total_size:
self.pbar.update(downloaded)
else:
self.pbar.finish()


def valid_dir(path):
"""Validate path input for argument parsing
Returns:
(str): Absolute path to vetted output path
"""

if Path(path).is_dir():
return str(Path(path).absolute())
elif not Path(path).exists():
try:
Path(path).mkdir(mode=0o777, parents=True)
except:
raise ArgumentTypeError(f"dir:{path} doesn't exist and can't be created")

return str(Path(path).absolute())

raise ArgumentTypeError(f"readable_dir:{path} is not a valid path")



def download(output_dir, local=False):
"""Download the resource bundle for
Returns:
(bool): True if successful, False otherwise.
"""
print(esc_colors.WARNING + 'Warning: cache download only implemented in serial local mode currently' + esc_colors.ENDC)
#TODO: slurm implementation
resources_to_download = json.loads(open(remote_resource_confg).read())

for resource, uri in resources_to_download.items():
protocol, url = parse_uri(uri)
handle_download(output_dir, resource, protocol, url)

print(esc_colors.OKGREEN + 'All resources downloaded!' + esc_colors.ENDC)

return True


def handle_download(output_dir, resource, protocol, url):
uri = protocol + "://" + url
if protocol in ('http', 'https', 'ftp'):
info_download(f"Getting web resource {resource}...")
fnurl = Path(urlparse(url).path).stem
urllib.request.urlretrieve(uri, filename=Path(output_dir, fnurl), reporthook=DownloadProgressBar())
elif protocol in ('docker'):
info_download(f"Getting docker resource {resource}...")
docker_tag = url.split('/')[-1]
docker_v = docker_tag.split(':')[1]
docker_name = docker_tag.split(':')[0]
subprocess.check_call(['singularity', 'pull', '-F', f"{docker_name}_{docker_v}.sif", uri], cwd=output_dir)
elif protocol in ('filelist'):
info_download(f"Getting meta-resource {resource}...")
file_list = urllib.request(url)
for i, _file_uri in enumerate(file_list, start=1):
this_protocol, this_url = parse_uri(_file_uri)
print(f"\t Getting resource {str(i)} of {str(len(file_list))}")
handle_download(output_dir, resource, this_protocol, this_url)
else:
raise ValueError(f"Unsupported resource protocol: {protocol}")
return
3 changes: 3 additions & 0 deletions scripts/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ def get_current_server():


# ~~~ configuration helpers ~~~
remote_resource_confg = Path(Path(__file__).parent, '..', 'config', 'remote.json').absolute()


def get_resource_config():
"""Return a dictionary containing server specific references utilized in
the workflow for directories or reference files.
Expand Down
25 changes: 20 additions & 5 deletions weave
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
# -*- coding: UTF-8 -*-
import argparse
from pathlib import Path
from scripts import utils, files, config

from scripts import utils, files, config, cache

# ~~~~ sub commands ~~~~
def run(args):
"""
Main frontend for demultiplexing and QA/QC
Expand All @@ -19,8 +19,9 @@ def run(args):
for i, sample in enumerate(sample_sheet.samples, start=1)
]
project_list = list(set([_sample.Sample_Project for _sample in sample_sheet.samples]))
# TODO: enforce single project
assert len(project_list) == 1
if len(project_list) > 1:
raise NotImplementedError("Unable to process multiple projects currently.\n" +
"Please file issue if this message is blocking: https://github.com/OpenOmics/weave/issues")

pairs = ['1', '2'] if sample_sheet.is_paired_end else ['1']

Expand All @@ -46,11 +47,18 @@ def run(args):
utils.exec_pipeline(exec_config, dry_run=args.dry_run, local=args.local)


def get_cache(sub_args):
"""
Main frontend for cache execution
"""
cache.download(sub_args.cachedir, local=sub_args.local)


# ~~~~ argument parsing commands ~~~~
if __name__ == '__main__':
main_parser = argparse.ArgumentParser(prog='weave')
sub_parsers = main_parser.add_subparsers(help='run subcommand help')

# ~~~ run subcommand ~~~
parser_run = sub_parsers.add_parser('run')
parser_run.add_argument('rundir', metavar='<run directory>', nargs="+", type=utils.valid_run_input,
help='Full & complete run id (format YYMMDD_INSTRUMENTID_TIME_FLOWCELLID) or absolute paths')
Expand All @@ -66,6 +74,13 @@ if __name__ == '__main__':
parser_run.add_argument('-l', '--local', action='store_true',
help='Execute pipeline locally without a dispatching executor')

parser_cache = sub_parsers.add_parser('cache')
parser_cache.add_argument('cachedir', metavar='<cache directory>', type=cache.valid_dir,
help='Relative or absolute path to directory for cache storage')
parser_cache.add_argument('-l', '--local', action='store_true',
help='Execute pipeline locally without a dispatching executor')

parser_run.set_defaults(func = run)
parser_cache.set_defaults(func = get_cache)
args = main_parser.parse_args()
args.func(args)

0 comments on commit 931fe21

Please sign in to comment.