Skip to content

Commit

Permalink
issue #1153 - speed up variant tags task
Browse files Browse the repository at this point in the history
  • Loading branch information
davmlaw committed Sep 5, 2024
1 parent a894d49 commit 5d7c5e5
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 33 deletions.
2 changes: 1 addition & 1 deletion snpdb/liftover.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def _get_build_liftover_dicts(alleles: Iterable[Allele], inserted_genome_build:
existing_builds = allele_builds[allele.pk]
for genome_build in other_builds:
if genome_build.pk in existing_builds:
logging.info("%s already lifted over to %s", allele, genome_build)
# logging.info("%s already lifted over to %s", allele, genome_build)
continue

# Now try different liftover methods (same contig, using dest build coords, using other builds and tool)
Expand Down
61 changes: 30 additions & 31 deletions upload/tasks/import_variant_tags_task.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import time
from collections import defaultdict

import pandas as pd
Expand All @@ -12,7 +13,8 @@
from library.pandas_utils import df_nan_to_none
from library.utils import invert_dict
from snpdb.liftover import create_liftover_pipelines
from snpdb.models import GenomeBuild, Variant, ImportSource, Tag, VariantAllele, VariantCoordinate, Allele
from snpdb.models import GenomeBuild, ImportSource, Tag, VariantAllele, VariantCoordinate, Allele
from snpdb.variant_pk_lookup import VariantPKLookup
from upload.models import UploadedVariantTags, UploadStep, ModifiedImportedVariant, SimpleVCFImportInfo
from upload.tasks.vcf.import_vcf_step_task import ImportVCFStepTask
from variantgrid.celery import app
Expand Down Expand Up @@ -138,50 +140,47 @@ def process_items(self, upload_step: UploadStep):
logging.info("_create_tags_from_variant_tags_import: %s!!", variant_tags_import)

genome_build = variant_tags_import.genome_build
variant_pk_lookup = VariantPKLookup(genome_build)

tag_cache = {}
user_matcher = UserMatcher(default_user=variant_tags_import.user)
variant_tags = []
created_date = []
ivt_variants = {}
ivt_variant_hashes = {}
vts_qs = variant_tags_import.importedvarianttag_set.all()
# We often have a lot of tags per variant - so do in order, then can re-use lookup code
last_variant = None
last_variant_string = None
for ivt in vts_qs.order_by("variant_string"):
if ivt.variant_string == last_variant_string:
variant = last_variant
else:
variant_coordinate = VariantCoordinate.from_string(ivt.variant_string)
try:
variant = Variant.get_from_variant_coordinate(variant_coordinate, genome_build)
except Variant.DoesNotExist:
# Must have been normalized
try:
variant = ModifiedImportedVariant.get_variant_for_unnormalized_variant(upload_step.upload_pipeline,
variant_coordinate)
except ModifiedImportedVariant.DoesNotExist as mvi:
msg = f"Could not find tag variant '{ivt.variant_string}' as Variant or ModifiedImportedVariant"
raise ValueError(msg) from mvi
for i, ivt in enumerate(vts_qs.order_by("variant_string")):
variant_coordinate = VariantCoordinate.from_string(ivt.variant_string)
ivt_variant_hashes[ivt] = variant_pk_lookup.get_variant_coordinate_hash(variant_coordinate)

ivt_variants[ivt] = variant
# Retrieve them all. Some may be None as they were modified/normalized
variant_hashes = list(set(ivt_variant_hashes.values()))
variant_ids_inc_null = variant_pk_lookup.get_variant_ids(variant_hashes, validate_not_null=False)
variant_ids_by_hash = dict(zip(variant_hashes, variant_ids_inc_null))

last_variant = variant
last_variant_string = ivt.variant_string
ivt_variants = {}
for ivt, variant_hash in ivt_variant_hashes.items():
variant_id = variant_ids_by_hash.get(variant_hash)
if variant_id is None:
variant_coordinate = VariantCoordinate.from_string(ivt.variant_string)
try:
variant = ModifiedImportedVariant.get_variant_for_unnormalized_variant(upload_step.upload_pipeline,
variant_coordinate)
except ModifiedImportedVariant.DoesNotExist as mvi:
msg = f"Could not find tag variant '{ivt.variant_string}' as Variant or ModifiedImportedVariant"
raise ValueError(msg) from mvi
variant_id = variant.pk
ivt_variants[ivt] = variant_id

logging.info("Loaded variants")
# The Alleles would have been made in BulkClinGenAlleleVCFProcessor
variants = set(ivt_variants.values())
variant_ids = set(ivt_variants.values())
# populate_clingen_alleles_for_variants(genome_build, variants)

va_qs = VariantAllele.objects.filter(variant__in=variants, genome_build=genome_build)
va_qs = VariantAllele.objects.filter(variant__in=variant_ids, genome_build=genome_build)
allele_id_by_variant_id = dict(va_qs.values_list("variant_id", "allele_id"))
logging.info("Loaded Alleles")

for i, (ivt, variant) in enumerate(ivt_variants.items()):
if i and i % 1000:
logging.info("Processed %d Imported Variant Tags", i)

for ivt, variant_id in ivt_variants.items():
tag = tag_cache.get(ivt.tag_string)
if tag is None:
tag, _ = Tag.objects.get_or_create(pk=ivt.tag_string)
Expand All @@ -190,10 +189,10 @@ def process_items(self, upload_step: UploadStep):
# We're not going to link analysis/nodes - as probably don't match up across systems
analysis = None
node = None
allele_id = allele_id_by_variant_id.get(variant.pk)
allele_id = allele_id_by_variant_id.get(variant_id)

# TODO: We should also look at not creating dupes somehow??
vt = VariantTag(variant=variant,
vt = VariantTag(variant_id=variant_id,
allele_id=allele_id,
genome_build=genome_build,
tag=tag,
Expand Down
2 changes: 1 addition & 1 deletion upload/tasks/vcf/import_vcf_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def _get_vcf_processor(self, upload_step, preprocess_vcf_import_info):
return BulkMinimalVCFProcessor(upload_step, preprocess_vcf_import_info)


class ProcessVCFLinkAllelesSetMaxVariantTask(ImportVCFStepTask):
class ProcessVCFLinkAllelesSetMaxVariantTask(AbstractProcessVCFTask):
""" Link Alleles provided as the ID column in VCF
Finds highest variant_id in VCF so we can tell whether we're done annotating or not
Can run in parallel on split VCFs """
Expand Down

0 comments on commit 5d7c5e5

Please sign in to comment.