Skip to content

Commit

Permalink
feat: use asyncio and subprocesses instead of fastapi's background tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
rohan-b-84 committed Jul 27, 2024
1 parent c82bc41 commit 9148e3c
Show file tree
Hide file tree
Showing 11 changed files with 268 additions and 242 deletions.
64 changes: 44 additions & 20 deletions src/api/endpoints.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import asyncio
import json
import os
from typing import Dict, List

Expand All @@ -7,8 +9,6 @@
from pydantic import BaseModel

from api.sessions import session_manager
from core.input import InputData
from core.results import analyse


class InputSchema(BaseModel):
Expand All @@ -21,11 +21,26 @@ class InputSchema(BaseModel):
router = APIRouter()


async def run_cli_command(command: list):
process = await asyncio.create_subprocess_exec(
*command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)

stdout, stderr = await process.communicate()

stdout = stdout.decode().strip()
stderr = stderr.decode().strip()

if process.returncode != 0:
raise RuntimeError(
f"CLI command failed with return code {process.returncode}: {stderr}"
)

return stdout


@router.post("/init")
async def initialize(
input_data: InputSchema,
background_tasks: BackgroundTasks,
) -> JSONResponse:
async def initialize(input_data: InputSchema) -> JSONResponse:
"""
Initialize the analysis process.
Expand Down Expand Up @@ -53,20 +68,29 @@ async def initialize(
)

session_id, result_dir = session_manager.new()
data = InputData(
nodesdb_f=session_manager.nodesdb_f,
go_mapping_f=session_manager.go_mapping_f,
pfam_mapping_f=session_manager.pfam_mapping_f,
sequence_ids_file=session_manager.sequence_ids_f,
ipr_mapping_f=session_manager.ipr_mapping_f,
cluster_file=session_manager.cluster_f,
config_data=input_data.config,
taxon_idx_mapping_file=session_manager.taxon_idx_mapping_file,
output_path=result_dir,
plot_format="png", # as we require images
)

background_tasks.add_task(analyse, data)
os.makedirs(result_dir, exist_ok=True)
config_f = os.path.join(result_dir, "config.json")

with open(config_f, "w") as file:
json.dump(input_data.config, file)

command = [
"python",
"src/main.py",
"analyse",
"-g",
session_manager.cluster_f,
"-c",
config_f,
"-s",
session_manager.sequence_ids_f,
"-m",
session_manager.taxon_idx_mapping_file,
"-o",
result_dir,
]

asyncio.create_task(run_cli_command(command))

return JSONResponse(
content={"detail": "Analysis task has been queued."},
Expand Down
9 changes: 7 additions & 2 deletions src/cli/commands.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import argparse
import sys
from typing import Union

from cli.validate import validate_cli_args
from core.config import SUPPORTED_PLOT_FORMATS, SUPPORTED_TAXRANKS, SUPPORTED_TESTS
Expand All @@ -13,7 +14,7 @@ def parse_args(
pfam_mapping_f: str,
ipr_mapping_f: str,
go_mapping_f: str,
) -> ServeArgs | InputData:
) -> Union[ServeArgs, InputData]:
"""Parse command-line arguments.
Args:
Expand Down Expand Up @@ -68,6 +69,9 @@ def parse_args(
other_files_group.add_argument(
"-p", "--species_ids_file", help="SpeciesIDs.txt used in OrthoFinder"
)
other_files_group.add_argument(
"-m", "--taxon_idx_mapping", help="TAXON IDX Mapping File"
)
other_files_group.add_argument(
"-f",
"--functional_annotation",
Expand Down Expand Up @@ -179,7 +183,7 @@ def parse_args(

return InputData(
cluster_file=args.cluster_file,
config_data=args.config_file,
config_f=args.config_file,
sequence_ids_file=args.sequence_ids_file,
species_ids_file=args.species_ids_file,
functional_annotation_f=args.functional_annotation,
Expand All @@ -202,6 +206,7 @@ def parse_args(
pfam_mapping_f=pfam_mapping_f,
ipr_mapping_f=ipr_mapping_f,
go_mapping_f=go_mapping_f,
taxon_idx_mapping_file=args.taxon_idx_mapping,
)
else:
sys.exit()
16 changes: 9 additions & 7 deletions src/core/alo.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, List, Literal, Optional, Set
from typing import Dict, List, Literal, Optional, Set, Union

from core.clusters import Cluster

Expand Down Expand Up @@ -33,7 +33,7 @@ def __init__(self, attribute: str, level: str, proteomes: Set[str]) -> None:
"shared": [],
}

self.protein_span_by_cluster_type: Dict[str, List[int | float]] = {
self.protein_span_by_cluster_type: Dict[str, List[Union[int, float]]] = {
"singleton": [],
"specific": [],
"shared": [],
Expand All @@ -59,15 +59,17 @@ def __init__(self, attribute: str, level: str, proteomes: Set[str]) -> None:
self.domain_counter_by_domain_source_by_cluster_type = None
self.protein_with_domain_count_by_domain_source_by_cluster_type = None

self.protein_length_stats_by_cluster_id: Dict[str, Dict[str, int | float]] = {}
self.protein_length_stats_by_cluster_id: Dict[
str, Dict[str, Union[int, float]]
] = {}
self.protein_count_by_cluster_id: Dict[str, int] = {}

def add_cluster(
self,
cluster: Cluster,
attribute_cluster_type: Literal["singleton", "shared", "specific"],
ALO_cluster_status: Literal["absent", "present"],
ALO_protein_length_stats: Dict[str, int | float],
ALO_protein_length_stats: Dict[str, Union[int, float]],
ALO_protein_ids_in_cluster: List[str],
ALO_cluster_cardinality: Optional[str],
mwu_pvalue: Optional[float],
Expand All @@ -84,7 +86,7 @@ def add_cluster(
Type of the cluster as either 'singleton', 'shared', or 'specific'.
ALO_cluster_status (Literal["absent", "present"]):
Status of the cluster, either 'absent' or 'present'.
ALO_protein_length_stats (Dict[str, int | float]):
ALO_protein_length_stats (Dict[str, Union[int, float]]):
Length statistics of proteins in the cluster.
ALO_protein_ids_in_cluster (List[str]):
List of protein IDs present in the cluster.
Expand Down Expand Up @@ -192,7 +194,7 @@ def get_cluster_count_by_cluster_status_by_cluster_type(
]
)

def get_protein_span_by_cluster_type(self, cluster_type: str) -> int | float:
def get_protein_span_by_cluster_type(self, cluster_type: str) -> Union[int, float]:
"""
Get the total span of proteins for a specific cluster type.
Expand All @@ -201,7 +203,7 @@ def get_protein_span_by_cluster_type(self, cluster_type: str) -> int | float:
Use "total" to get the total span across all cluster types.
Returns:
int | float: Total span of proteins in the specified cluster type.
Union[int, float]: Total span of proteins in the specified cluster type.
If 'cluster_type' is "total", returns the sum of spans across all
cluster types.
"""
Expand Down
2 changes: 1 addition & 1 deletion src/core/alo_collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ def generate_header_for_node(self, node: ete3.TreeNode, dirs: Dict[str, str]):
table.scale(2, 1)
for key, cell in list(table.get_celld().items()):
row, col = key
cell._text.set_color("grey")
cell._text.set_color("grey") # type:ignore
cell.set_edgecolor("darkgrey")
cell.visible_edges = "T" if row > 0 else "B"
if row == len(data) - 2:
Expand Down
Loading

0 comments on commit 9148e3c

Please sign in to comment.