diff --git a/phylogenetic/Snakefile b/phylogenetic/Snakefile index 622ba18..03d717d 100644 --- a/phylogenetic/Snakefile +++ b/phylogenetic/Snakefile @@ -4,8 +4,8 @@ rule all: input: auspice_json = "auspice/zika.json" -include: "rules/prepare_sequences.smk" include: "rules/merge_sequences_usvi.smk" +include: "rules/prepare_sequences.smk" include: "rules/construct_phylogeny.smk" include: "rules/annotate_phylogeny.smk" include: "rules/export.smk" diff --git a/phylogenetic/defaults/config_zika.yaml b/phylogenetic/defaults/config_zika.yaml index a3627fb..ddaf039 100644 --- a/phylogenetic/defaults/config_zika.yaml +++ b/phylogenetic/defaults/config_zika.yaml @@ -1,7 +1,15 @@ # Sequences must be FASTA and metadata must be TSV # Both files must be zstd compressed -sequences_url: "https://data.nextstrain.org/files/workflows/zika/sequences.fasta.zst" -metadata_url: "https://data.nextstrain.org/files/workflows/zika/metadata.tsv.zst" + +inputs: + - name: ncbi + metadata: "s3://nextstrain-data/files/workflows/zika/metadata.tsv.zst" + sequences: "s3://nextstrain-data/files/workflows/zika/sequences.fasta.zst" + +additional_inputs: + - name: usvi + metadata: "data/metadata_usvi.tsv" + sequences: "data/sequences_usvi.fasta" strain_id_field: "accession" diff --git a/phylogenetic/rules/annotate_phylogeny.smk b/phylogenetic/rules/annotate_phylogeny.smk index d09daf7..1de8bef 100644 --- a/phylogenetic/rules/annotate_phylogeny.smk +++ b/phylogenetic/rules/annotate_phylogeny.smk @@ -73,7 +73,7 @@ rule traits: """ input: tree = "results/tree.nwk", - metadata = "data/metadata_all.tsv" + metadata = input_metadata, output: node_data = "results/traits.json", params: diff --git a/phylogenetic/rules/construct_phylogeny.smk b/phylogenetic/rules/construct_phylogeny.smk index 3393ce0..b143d76 100644 --- a/phylogenetic/rules/construct_phylogeny.smk +++ b/phylogenetic/rules/construct_phylogeny.smk @@ -43,7 +43,7 @@ rule refine: input: tree = "results/tree_raw.nwk", alignment = "results/aligned.fasta", - metadata = "data/metadata_all.tsv" + metadata = input_metadata, output: tree = "results/tree.nwk", node_data = "results/branch_lengths.json" diff --git a/phylogenetic/rules/export.smk b/phylogenetic/rules/export.smk index 77d020f..f09870b 100644 --- a/phylogenetic/rules/export.smk +++ b/phylogenetic/rules/export.smk @@ -29,7 +29,7 @@ rule export: """Exporting data files for for auspice""" input: tree = "results/tree.nwk", - metadata = "data/metadata_all.tsv", + metadata = input_metadata, branch_lengths = "results/branch_lengths.json", traits = "results/traits.json", nt_muts = "results/nt_muts.json", diff --git a/phylogenetic/rules/merge_sequences_usvi.smk b/phylogenetic/rules/merge_sequences_usvi.smk index 3765937..7ee0c7f 100644 --- a/phylogenetic/rules/merge_sequences_usvi.smk +++ b/phylogenetic/rules/merge_sequences_usvi.smk @@ -21,22 +21,141 @@ This part of the workflow usually includes the following steps: """ -rule append_usvi: - """Appending USVI sequences""" - input: - sequences = "data/sequences.fasta", - metadata = "data/metadata.tsv", - usvi_sequences = "data/sequences_usvi.fasta", - usvi_metadata = "data/metadata_usvi.tsv" +# ------------- helper functions to collect, merge & download input files ------------------- # +NEXTSTRAIN_PUBLIC_BUCKET = "s3://nextstrain-data/" + +def _parse_config_input(input): + """ + Parses information from an individual config-defined input, i.e. an element within `config.inputs` or `config.additional_inputs` + and returns information snakemake rules can use to obtain the underlying data. + + The structure of `input` is a dictionary with keys: + - name:string (required) + - metadata:string (optional) - a s3 URI or a local file path + - sequences:string|dict[string,string] (optional) - either a s3 URI or a local file path, in which case + it must include a '{segment}' wildcard substring, or a dict of segment → s3 URI or local file path, + in which case it must not include the wildcard substring. + + Returns a dictionary with optional keys: + - metadata:string - the relative path to the metadata file. If the original data was remote then this represents + the output of a rule which downloads the file + - metadata_location:string - the URI for the remote file if applicable else `None` + - sequences:function. Takes in wildcards and returns the relative path to the sequences FASTA for the provided + segment wildcard, or returns `None` if this input doesn't define sequences for the provided segment. + - sequences_location:function. Takes in wildcards and returns the URI for the remote file, or `None`, where applicable. + + Raises InvalidConfigError + """ + name = input['name'] + lambda_none = lambda w: None + + info = {'metadata': None, 'metadata_location': None, 'sequences': lambda_none, 'sequences_location': lambda_none} + + def _source(uri, *, s3, local): + if uri.startswith('s3://'): + return s3 + elif uri.lower().startswith('http://') or uri.lower().startswith('https://'): + raise InvalidConfigError("Workflow cannot yet handle HTTP[S] inputs") + return local + + if location:=input.get('metadata', False): + info['metadata'] = _source(location, s3=f"data/{name}/metadata.tsv", local=location) + info['metadata_location'] = _source(location, s3=location, local=None) + + if location:=input.get('sequences', False): + info['sequences'] = _source(location, s3=f"data/{name}/sequences.fasta", local=location) + info['sequences_location'] = _source(location, s3=location, local=None) + + return info + + +def _gather_inputs(): + all_inputs = [*config['inputs'], *config.get('additional_inputs', [])] + + if len(all_inputs)==0: + raise InvalidConfigError("Config must define at least one element in config.inputs or config.additional_inputs lists") + if not all([isinstance(i, dict) for i in all_inputs]): + raise InvalidConfigError("All of the elements in config.inputs and config.additional_inputs lists must be dictionaries" + "If you've used a command line '--config' double check your quoting.") + if len({i['name'] for i in all_inputs})!=len(all_inputs): + raise InvalidConfigError("Names of inputs (config.inputs and config.additional_inputs) must be unique") + if not all(['name' in i and ('sequences' in i or 'metadata' in i) for i in all_inputs]): + raise InvalidConfigError("Each input (config.inputs and config.additional_inputs) must have a 'name' and 'metadata' and/or 'sequences'") + + return {i['name']: _parse_config_input(i) for i in all_inputs} + +input_sources = _gather_inputs() + +def input_metadata(wildcards): + inputs = [info['metadata'] for info in input_sources.values() if info.get('metadata', None)] + return inputs[0] if len(inputs)==1 else "results/metadata_merged.tsv" + +def input_sequences(wildcards): + inputs = [info['sequences'] for info in input_sources.values() if info.get('sequences', None)] + return inputs[0] if len(inputs)==1 else "results/sequences_merged.fasta" + +# def input_sequences(wildcards): +# inputs = list(filter(None, [info['sequences'](wildcards) for info in input_sources.values() if info.get('sequences', None)])) +# return inputs[0] if len(inputs)==1 else "results/sequences_merged_{segment}.fasta" + +rule download_s3_sequences: output: - sequences = "data/sequences_all.fasta", - metadata = "data/metadata_all.tsv" + sequences = "data/{input_name}/sequences.fasta", + params: + address = lambda w: input_sources[w.input_name]['sequences_location'], + no_sign_request=lambda w: "--no-sign-request" \ + if input_sources[w.input_name]['sequences_location'].startswith(NEXTSTRAIN_PUBLIC_BUCKET) \ + else "", shell: """ - seqkit rmdup {input.usvi_sequences} {input.sequences} > {output.sequences} + aws s3 cp {params.no_sign_request:q} {params.address:q} - | zstd -d > {output.sequences} + """ +rule download_s3_metadata: + output: + metadata = "data/{input_name}/metadata.tsv", + params: + address = lambda w: input_sources[w.input_name]['metadata_location'], + no_sign_request=lambda w: "--no-sign-request" \ + if input_sources[w.input_name]['metadata_location'].startswith(NEXTSTRAIN_PUBLIC_BUCKET) \ + else "", + shell: + """ + aws s3 cp {params.no_sign_request:q} {params.address:q} - | zstd -d > {output.metadata} + """ + +rule merge_metadata: + """ + This rule should only be invoked if there are multiple defined metadata inputs + (config.inputs + config.additional_inputs) + """ + input: + **{name: info['metadata'] for name,info in input_sources.items() if info.get('metadata', None)} + params: + metadata = lambda w, input: list(map("=".join, input.items())), + id_field = config['strain_id_field'], + output: + metadata = "results/metadata_merged.tsv" + shell: + r""" augur merge \ - --metadata ingest={input.metadata} usvi={input.usvi_metadata} \ - --metadata-id-columns accession \ - --output-metadata {output.metadata} - """ \ No newline at end of file + --metadata {params.metadata:q} \ + --metadata-id-columns {params.id_field} \ + --output-metadata {output.metadata} + """ + +rule merge_sequences: + """ + This rule should only be invoked if there are multiple defined sequences inputs + (config.inputs + config.additional_inputs) for this particular segment + """ + input: + **{name: info['sequences'] for name,info in input_sources.items() if info.get('sequences', None)} + output: + sequences = "results/sequences_merged.fasta" + shell: + r""" + seqkit rmdup {input:q} > {output.sequences:q} + """ + +# -------------------------------------------------------------------------------------------- # \ No newline at end of file diff --git a/phylogenetic/rules/prepare_sequences.smk b/phylogenetic/rules/prepare_sequences.smk index 39bd66f..a6f455c 100644 --- a/phylogenetic/rules/prepare_sequences.smk +++ b/phylogenetic/rules/prepare_sequences.smk @@ -21,34 +21,6 @@ This part of the workflow usually includes the following steps: See Augur's usage docs for these commands for more details. """ -rule download: - """Downloading sequences and metadata from data.nextstrain.org""" - output: - sequences = "data/sequences.fasta.zst", - metadata = "data/metadata.tsv.zst" - params: - sequences_url = config["sequences_url"], - metadata_url = config["metadata_url"], - shell: - """ - curl -fsSL --compressed {params.sequences_url:q} --output {output.sequences} - curl -fsSL --compressed {params.metadata_url:q} --output {output.metadata} - """ - -rule decompress: - """Decompressing sequences and metadata""" - input: - sequences = "data/sequences.fasta.zst", - metadata = "data/metadata.tsv.zst" - output: - sequences = "data/sequences.fasta", - metadata = "data/metadata.tsv" - shell: - """ - zstd -d -c {input.sequences} > {output.sequences} - zstd -d -c {input.metadata} > {output.metadata} - """ - rule filter: """ Filtering to @@ -58,8 +30,8 @@ rule filter: - minimum genome length of {params.min_length} (50% of Zika virus genome) """ input: - sequences = "data/sequences_all.fasta", - metadata = "data/metadata_all.tsv", + sequences = input_sequences, + metadata = input_metadata, exclude = "defaults/dropped_strains.txt", output: sequences = "results/filtered.fasta"