Skip to content

Commit

Permalink
Process traits in parallel for faster turnaround times
Browse files Browse the repository at this point in the history
  • Loading branch information
tskir committed Oct 11, 2019
1 parent 2f2cb85 commit 27e0588
Showing 1 changed file with 18 additions and 18 deletions.
36 changes: 18 additions & 18 deletions eva_cttv_pipeline/trait_mapping/main.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from collections import Counter
import csv
import logging
import progressbar
import multiprocessing

from eva_cttv_pipeline.trait_mapping.output import output_trait
from eva_cttv_pipeline.trait_mapping.oxo import get_oxo_results
Expand Down Expand Up @@ -30,8 +30,7 @@ def get_uris_for_oxo(zooma_result_list: list) -> set:
return uri_set


def process_trait(trait: Trait, filters: dict, zooma_host: str, oxo_target_list: list,
oxo_distance: int) -> Trait:
def process_trait(trait: Trait, filters: dict, zooma_host: str, oxo_target_list: list, oxo_distance: int) -> Trait:
"""
Process a single trait. Find any mappings in Zooma. If there are no high confidence Zooma
mappings that are in EFO then query OxO with any high confidence mappings not in EFO.
Expand Down Expand Up @@ -68,31 +67,32 @@ def process_trait(trait: Trait, filters: dict, zooma_host: str, oxo_target_list:
return trait


def main(input_filepath, output_mappings_filepath, output_curation_filepath, filters, zooma_host,
oxo_target_list, oxo_distance, unattended):
def main(input_filepath, output_mappings_filepath, output_curation_filepath, filters, zooma_host, oxo_target_list,
oxo_distance):
logger.info('Started parsing trait names')
trait_names_list = parse_trait_names(input_filepath)
trait_names_counter = Counter(trait_names_list)
logger.info("Loaded {} trait names".format(len(trait_names_counter)))

with open(output_mappings_filepath, "w", newline='') as mapping_file, \
open(output_curation_filepath, "wt") as curation_file:
mapping_writer = csv.writer(mapping_file, delimiter="\t")
mapping_writer.writerow(["#clinvar_trait_name", "uri", "label"])
curation_writer = csv.writer(curation_file, delimiter="\t")

trait_names_iterator = trait_names_counter.items()
if not unattended:
progress = progressbar.ProgressBar(max_value=len(trait_names_counter),
widgets=[progressbar.AdaptiveETA(samples=1000)])
trait_names_iterator = progress(trait_names_iterator)

logger.info("Loaded {} trait names".format(len(trait_names_counter)))
for i, (trait_name, freq) in enumerate(trait_names_iterator):
trait = Trait(trait_name, freq)
trait = process_trait(trait, filters, zooma_host, oxo_target_list,
oxo_distance)
logger.info('Processing trait names in parallel')
trait_list = [Trait(trait_name, freq) for trait_name, freq in trait_names_counter.items()]
trait_process_pool = multiprocessing.Pool(processes=12)

processed_trait_list = [
trait_process_pool.apply(
process_trait,
args=(trait, filters, zooma_host, oxo_target_list, oxo_distance)
)
for trait in trait_list
]

for trait in processed_trait_list:
output_trait(trait, mapping_writer, curation_writer)
if unattended and i % 100 == 0:
logger.info("Processed {} records".format(i))

logger.info('Finished processing trait names')

0 comments on commit 27e0588

Please sign in to comment.