diff --git a/platformics/support/format_handlers.py b/platformics/support/format_handlers.py index 56a85873..6f4f7be0 100644 --- a/platformics/support/format_handlers.py +++ b/platformics/support/format_handlers.py @@ -9,6 +9,7 @@ from Bio import SeqIO from typing import Protocol + from mypy_boto3_s3 import S3Client @@ -101,12 +102,19 @@ class JsonHandler(FileFormatHandler): def validate(self) -> None: json.loads(self.contents()) # throws an exception for invalid JSON +class ZipHandler(FileFormatHandler): + """ + Validate ZIP files + """ + + def validate(self) -> None: + assert self.key.endswith(".zip") # throws an exception if the file is not a zip file def get_validator(format: str) -> type[FileFormatHandler]: """ Returns the validator for a given file format """ - if format == "fasta": + if format in ["fa", "fasta"]: return FastaHandler elif format == "fastq": return FastqHandler @@ -114,5 +122,7 @@ def get_validator(format: str) -> type[FileFormatHandler]: return BedHandler elif format == "json": return JsonHandler + elif format == "zip": + return ZipHandler else: raise Exception(f"Unknown file format '{format}'") diff --git a/workflows/.happy/terraform/envs/dev/iam_policies.tf b/workflows/.happy/terraform/envs/dev/iam_policies.tf index 0f53e48f..de32f2b8 100644 --- a/workflows/.happy/terraform/envs/dev/iam_policies.tf +++ b/workflows/.happy/terraform/envs/dev/iam_policies.tf @@ -39,7 +39,8 @@ data "aws_iam_policy_document" "workflows" { "states:StartExecution" ] resources = [ - "arn:aws:states:us-west-2:${var.aws_account_id}:stateMachine:idseq-swipe-dev-default-wdl" + "arn:aws:states:us-west-2:${var.aws_account_id}:stateMachine:idseq-swipe-dev-default-wdl", + "arn:aws:states:us-west-2:${var.aws_account_id}:execution:idseq-swipe-dev-default-wdl:*" ] } statement { diff --git a/workflows/.happy/terraform/envs/sandbox/iam_policies.tf b/workflows/.happy/terraform/envs/sandbox/iam_policies.tf index 07088a24..ebfce817 100644 --- a/workflows/.happy/terraform/envs/sandbox/iam_policies.tf +++ b/workflows/.happy/terraform/envs/sandbox/iam_policies.tf @@ -39,7 +39,8 @@ data "aws_iam_policy_document" "workflows" { "states:StartExecution" ] resources = [ - "arn:aws:states:us-west-2:${var.aws_account_id}:stateMachine:idseq-swipe-sandbox-default-wdl" + "arn:aws:states:us-west-2:${var.aws_account_id}:stateMachine:idseq-swipe-sandbox-default-wdl", + "arn:aws:states:us-west-2:${var.aws_account_id}:execution:idseq-swipe-sandbox-default-wdl:*" ] } statement { diff --git a/workflows/.happy/terraform/envs/staging/iam_policies.tf b/workflows/.happy/terraform/envs/staging/iam_policies.tf index 98f564dc..7f7c4f46 100644 --- a/workflows/.happy/terraform/envs/staging/iam_policies.tf +++ b/workflows/.happy/terraform/envs/staging/iam_policies.tf @@ -39,7 +39,8 @@ data "aws_iam_policy_document" "workflows" { "states:StartExecution" ] resources = [ - "arn:aws:states:us-west-2:${var.aws_account_id}:stateMachine:idseq-swipe-staging-default-wdl" + "arn:aws:states:us-west-2:${var.aws_account_id}:stateMachine:idseq-swipe-staging-default-wdl", + "arn:aws:states:us-west-2:${var.aws_account_id}:execution:idseq-swipe-staging-default-wdl:*" ] } statement { diff --git a/workflows/plugins/event_bus/swipe/event_bus_swipe.py b/workflows/plugins/event_bus/swipe/event_bus_swipe.py index 0e2f7326..8e75a1b3 100644 --- a/workflows/plugins/event_bus/swipe/event_bus_swipe.py +++ b/workflows/plugins/event_bus/swipe/event_bus_swipe.py @@ -67,7 +67,7 @@ def _parse_message(self, message: dict) -> WorkflowStatusMessage | None: # TODO: handle aws.batch for step statuses if not message.get("source") == "aws.states": return None - status = self._create_workflow_status(message["status"]) + status = self._create_workflow_status(message["detail"]["status"]) execution_arn = message["detail"]["executionArn"] if status == "WORKFLOW_SUCCESS": return WorkflowSucceededMessage( diff --git a/workflows/plugins/input_loaders/czid_workflows/bulk_download_input.py b/workflows/plugins/input_loaders/czid_workflows/bulk_download_input.py new file mode 100644 index 00000000..9f4a7f4d --- /dev/null +++ b/workflows/plugins/input_loaders/czid_workflows/bulk_download_input.py @@ -0,0 +1,77 @@ +from sgqlc.operation import Operation + +from database.models.workflow_version import WorkflowVersion +from manifest.manifest import EntityInput, Primitive +from platformics.client.entities_schema import ( + ConsensusGenomeWhereClause, + Query, + UUIDComparators, +) +from platformics.util.types_utils import JSONValue +from plugins.plugin_types import InputLoader + +PUBLIC_REFERENCES_PREFIX = "s3://czid-public-references/consensus-genome" +CG_BULK_DOWNLOAD_OUTPUT = "consensus_genome_intermediate_output_files" +CG_BULK_DOWNLOAD_CONSENSUS = "consensus_genome" +CG_BULK_DOWNLOADS = [CG_BULK_DOWNLOAD_CONSENSUS, CG_BULK_DOWNLOAD_OUTPUT] + + +class BulkDownloadInputLoader(InputLoader): + async def load( + self, + workflow_version: WorkflowVersion, + entity_inputs: dict[str, EntityInput | list[EntityInput]], + raw_inputs: dict[str, Primitive | list[Primitive]], + requested_outputs: list[str] = [], + ) -> dict[str, JSONValue]: + inputs: dict[str, JSONValue] = {} + if raw_inputs.get("bulk_download_type") in CG_BULK_DOWNLOADS: + consensus_genome_input = entity_inputs["consensus_genomes"] + op = Operation(Query) + if isinstance(consensus_genome_input, EntityInput): + # if single input + consensus_genome = op.consensus_genomes( + where=ConsensusGenomeWhereClause(id=UUIDComparators(_eq=consensus_genome_input.entity_id)) + ) + else: + # must be list of inputs + consensus_genome = op.consensus_genomes( + where=ConsensusGenomeWhereClause( + id=UUIDComparators(_in=[cg.entity_id for cg in consensus_genome_input]) + ) + ) + consensus_genome.sequencing_read() + consensus_genome.sequencing_read.sample() + consensus_genome.sequencing_read.sample.id() + consensus_genome.sequencing_read.sample.name() + consensus_genome.accession() + consensus_genome.accession.accession_id() + if raw_inputs.get("bulk_download_type") == CG_BULK_DOWNLOAD_OUTPUT: + self._fetch_file(consensus_genome.intermediate_outputs()) + elif raw_inputs.get("bulk_download_type") == CG_BULK_DOWNLOAD_CONSENSUS: + self._fetch_file(consensus_genome.sequence()) + res = self._entities_gql(op) + files: list[dict[str, Primitive | None]] = [] + for cg_res in res["consensusGenomes"]: + sample_name = f"{cg_res['sequencingRead']['sample']['name']}" + sample_id = f"{cg_res['sequencingRead']['sample']['id']}" + if cg_res["accession"]: + accession = f"{cg_res['accession']['accessionId']}" + output_name = f"{sample_name}_{sample_id}_{accession}" + else: + output_name = f"{sample_name}_{sample_id}" + + if raw_inputs.get("bulk_download_type") == CG_BULK_DOWNLOAD_OUTPUT: + download_link = self._uri_file(cg_res["intermediateOutputs"]) + suffix = ".zip" + elif raw_inputs.get("bulk_download_type") == CG_BULK_DOWNLOAD_CONSENSUS: + download_link = self._uri_file(cg_res["sequence"]) + suffix = ".fa" + files.append( + { + "output_name": output_name + suffix, + "file_path": download_link, + } + ) + inputs["files"] = files # type: ignore + return inputs diff --git a/workflows/plugins/input_loaders/czid_workflows/setup.py b/workflows/plugins/input_loaders/czid_workflows/setup.py index d27dabc4..bf987ab7 100644 --- a/workflows/plugins/input_loaders/czid_workflows/setup.py +++ b/workflows/plugins/input_loaders/czid_workflows/setup.py @@ -10,13 +10,14 @@ long_description="", long_description_content_type="text/markdown", author="Todd Morse", - py_modules=["consensus_genome_input"], + py_modules=["consensus_genome_input", "bulk_download_input"], python_requires=">=3.6", setup_requires=[], reentry_register=True, entry_points={ "czid.plugin.input_loader": [ "consensus_genome = consensus_genome_input:ConsensusGenomeInputLoader", + "bulk_download = bulk_download_input:BulkDownloadInputLoader", ], }, ) diff --git a/workflows/plugins/output_loaders/entities/bulk_download_output.py b/workflows/plugins/output_loaders/entities/bulk_download_output.py index 3f34c2f9..f8b1018a 100644 --- a/workflows/plugins/output_loaders/entities/bulk_download_output.py +++ b/workflows/plugins/output_loaders/entities/bulk_download_output.py @@ -28,6 +28,10 @@ async def load( file_path = workflow_outputs["file"] assert isinstance(file_path, str) + file_format = file_path.split(".")[-1] + # if file_path ends with .txt, change file_format to fasta + file_format = "fasta" if file_format == "txt" else file_format + bulk_download = op.create_bulk_download( input=BulkDownloadCreateInput( producing_run_id=ID(workflow_run.id), @@ -43,7 +47,7 @@ async def load( file = op.create_file( entity_id=bulk_download_id, entity_field_name="file", - file=FileCreate(name="file", file_format="fasta", **self._parse_uri(file_path)), + file=FileCreate(name="file", file_format=file_format, **self._parse_uri(file_path)), ) file.id() self._entities_gql(op) diff --git a/workflows/plugins/output_loaders/entities/consensus_genome_output.py b/workflows/plugins/output_loaders/entities/consensus_genome_output.py index e29b783a..eb4528a9 100644 --- a/workflows/plugins/output_loaders/entities/consensus_genome_output.py +++ b/workflows/plugins/output_loaders/entities/consensus_genome_output.py @@ -69,7 +69,7 @@ async def load( reference_genome_id: ID | None = None if reference_genome_input: assert isinstance(reference_genome_input, EntityInput) - reference_genome_id = ID(reference_genome_input.entity_id) + reference_genome_id = ID(reference_genome_input.entity_id) # type: ignore consensus_genome = op.create_consensus_genome( input=ConsensusGenomeCreateInput(