From ab88149911d3c23683ba6a0b03f64d9299e137a0 Mon Sep 17 00:00:00 2001 From: James Stevenson Date: Wed, 3 Jan 2024 13:17:34 -0500 Subject: [PATCH] stash --- src/gene/cli.py | 2 + src/gene/etl/ensembl.py | 115 ++++++++++++++++++++++++---------------- src/gene/etl/ncbi.py | 66 ++++++++++++++--------- src/gene/etl/update.py | 2 +- 4 files changed, 114 insertions(+), 71 deletions(-) diff --git a/src/gene/cli.py b/src/gene/cli.py index b3ef3ee5..df3ba329 100644 --- a/src/gene/cli.py +++ b/src/gene/cli.py @@ -1,4 +1,5 @@ """Provides a CLI util to make updates to normalizer database.""" +import logging import os from pathlib import Path from typing import Optional, Tuple @@ -16,6 +17,7 @@ @click.group() def cli() -> None: """Manage Gene Normalizer data.""" + logging.basicConfig(filename="gene-normalizer.log", level=logging.INFO, force=True) @cli.command() diff --git a/src/gene/etl/ensembl.py b/src/gene/etl/ensembl.py index 74ac8f4b..5c803cf8 100644 --- a/src/gene/etl/ensembl.py +++ b/src/gene/etl/ensembl.py @@ -2,15 +2,19 @@ import logging import re from typing import Dict, Optional +from urllib.parse import unquote -import gffutils -from gffutils.feature import Feature +import click +import gffpandas.gffpandas as gffpd +import pandas as pd +from tqdm import tqdm from gene.etl.base import Base, GeneNormalizerEtlError from gene.schemas import ( DataLicenseAttributes, NamespacePrefix, SourceMeta, + StoredSequenceLocation, Strand, ) @@ -40,45 +44,42 @@ def _extract_data(self, use_existing: bool) -> None: def _transform_data(self) -> None: """Transform the Ensembl source.""" _logger.info("Transforming Ensembl data...") - db = gffutils.create_db( - str(self._data_file), - dbfn=":memory:", - force=True, - merge_strategy="create_unique", - keep_order=True, + df = gffpd.read_gff3(self._data_file).attributes_to_columns() + df["seq_id"] = df["seq_id"].astype(str) + df["description"] = df["description"].apply( + lambda d: unquote(d) if d is not None else None ) + accession_numbers = {} + for _, row in df[df["type"].isin(["chromosome", "scaffold"])].iterrows(): + accession_numbers[row.seq_id] = row.Alias.split(",")[-1] - # Get accession numbers - accession_numbers = dict() - for item in db.features_of_type("scaffold"): - accession_numbers[item[0]] = item[8]["Alias"][-1] - for item in db.features_of_type("chromosome"): - accession_numbers[item[0]] = item[8]["Alias"][-1] - - for f in db.all_features(): - if f.attributes.get("ID"): - f_id = f.attributes.get("ID")[0].split(":")[0] - if f_id == "gene": - gene = self._add_gene(f, accession_numbers) - self._load_gene(gene) + gene_df = df[df["ID"].str.startswith("gene", na=False)] + + if not self._silent: + click.echo(f"Loading rows from {self._data_file}:") + for _, row in tqdm( + gene_df.iterrows(), total=gene_df.shape[0], disable=self._silent, ncols=80 + ): + gene = self._add_gene(row, accession_numbers) + self._load_gene(gene) _logger.info("Ensembl data transform complete.") - def _add_gene(self, f: Feature, accession_numbers: Dict) -> Dict: + def _add_gene(self, row: pd.Series, accession_numbers: Dict) -> Dict: """Create a transformed gene record. - :param f: A gene from the data + :param row: A row from the gene data table :param accession_numbers: Accession numbers for each chromosome and scaffold :return: A gene dictionary containing data if the ID attribute exists. """ gene_params = dict() - if f.strand == "-": + if row.strand == "-": gene_params["strand"] = Strand.REVERSE.value - elif f.strand == "+": + elif row.strand == "+": gene_params["strand"] = Strand.FORWARD.value - self._add_attributes(f, gene_params) + self._add_attributes(row, gene_params) location = self._build_sequence_location( - accession_numbers[f.seqid], f, gene_params["concept_id"] + accession_numbers[row.seq_id], row, gene_params["concept_id"] ) if location: gene_params["locations"] = [location] @@ -88,28 +89,52 @@ def _add_gene(self, f: Feature, accession_numbers: Dict) -> Dict: return gene_params - def _add_attributes(self, f: Feature, gene: Dict) -> None: + def _build_sequence_location( + self, seq_id: str, row: pd.Series, concept_id: str + ) -> Optional[StoredSequenceLocation]: + """Construct a sequence location for storing in a DB. + + :param seq_id: The sequence ID. + :param row: A gene from the source file. + :param concept_id: record ID from source + :return: A storable SequenceLocation containing relevant params for returning a + VRS SequenceLocation, or None if unable to retrieve valid parameters + """ + aliases = self._get_seq_id_aliases(seq_id) + if not aliases or row.start is None or row.end is None: + return None + + sequence = aliases[0] + + if row.start != "." and row.end != "." and sequence: + if 0 <= row.start <= row.end: + return StoredSequenceLocation( + start=row.start - 1, + end=row.end, + sequence_id=sequence, + ) + else: + _logger.warning( + f"{concept_id} has invalid interval: start={row.start - 1} end={row.end}" + ) + + def _add_attributes(self, row: pd.Series, gene: Dict) -> None: """Add concept_id, symbol, and xrefs to a gene record. - :param f: A gene from the data + :param row: A gene from the data :param gene: A transformed gene record """ - for key, value in f.attributes.items(): - if key == "ID" and value[0].startswith("gene"): - gene[ - "concept_id" - ] = f"{NamespacePrefix.ENSEMBL.value}:{value[0].split(':')[1]}" - elif key == "description": - pattern = "^(.*) \\[Source:([^\\s]*)?( .*)?;Acc:(.*:)?(.*)?\\]$" - matches = re.findall(pattern, value[0]) - if matches: - gene["label"] = matches[0][0] - if matches[0][1] and matches[0][4]: - gene["xrefs"] = [self._get_xref(matches[0][1], matches[0][4])] - elif key == "Name": - gene["symbol"] = value[0] - elif key == "biotype": - gene["gene_type"] = value[0] + gene["concept_id"] = f"{NamespacePrefix.ENSEMBL.value}:{row.ID.split(':')[1]}" + gene["symbol"] = row.Name + gene["gene_type"] = row.biotype + + if row.description: + pattern = "^(.*) \\[Source:([^\\s]*)?( .*)?Acc:(.*:)?(.*)?\\]$" + matches = re.findall(pattern, row.description) + if matches: + gene["label"] = matches[0][0] + if matches[0][1] and matches[0][4]: + gene["xrefs"] = [self._get_xref(matches[0][1], matches[0][4])] def _get_xref(self, src_name: str, src_id: str) -> Optional[str]: """Get xref. diff --git a/src/gene/etl/ncbi.py b/src/gene/etl/ncbi.py index a5954bbe..fa921371 100644 --- a/src/gene/etl/ncbi.py +++ b/src/gene/etl/ncbi.py @@ -5,7 +5,9 @@ from pathlib import Path from typing import Dict, List, Optional +import gffpandas.gffpandas as gffpd import gffutils +import pandas as pd from wags_tails import NcbiGenomeData from wags_tails.ncbi import NcbiGenePaths @@ -176,26 +178,37 @@ def _get_gene_info(self, prev_symbols: Dict[str, str]) -> Dict[str, str]: params["gene_type"] = row[9] return info_genes - def _get_gene_gff(self, db: gffutils.FeatureDB, info_genes: Dict) -> None: + def _get_gene_gff(self, df: pd.DataFrame, info_genes: Dict) -> None: """Store genes from NCBI gff file. :param db: GFF database :param info_genes: A dictionary of gene's from the NCBI info file. """ - for f in db.all_features(): - if f.attributes.get("ID"): - f_id = f.attributes.get("ID")[0] - if f_id.startswith("gene"): - symbol = f.attributes["Name"][0] - if symbol in info_genes: - params: Dict = info_genes.get(symbol) # type: ignore - vrs_sq_location = self._get_vrs_sq_location(db, params, f_id) - if vrs_sq_location: - params["locations"].append(vrs_sq_location) - else: - # Need to add entire gene - gene = self._add_gff_gene(db, f, f_id) - info_genes[gene["symbol"]] = gene + for _, row in df[df["ID"].str.startswith("gene", na=False)].iterrows(): + symbol = row.Name + if symbol in info_genes: + params: Dict = info_genes[symbol] + vrs_sq_location = self._get_vrs_sq_location(df, params, row.ID) + if vrs_sq_location: + params["locations"].append(vrs_sq_location) + else: + gene = self._add_gff_gene(df, row) + info_genes[gene["symbol"]] = gene + + # for f in df.all_features(): + # if f.attributes.get("ID"): + # f_id = f.attributes.get("ID")[0] + # if f_id.startswith("gene"): + # symbol = f.attributes["Name"][0] + # if symbol in info_genes: + # params: Dict = info_genes.get(symbol) # type: ignore + # vrs_sq_location = self._get_vrs_sq_location(df, params, f_id) + # if vrs_sq_location: + # params["locations"].append(vrs_sq_location) + # else: + # # Need to add entire gene + # gene = self._add_gff_gene(df, f, f_id) + # info_genes[gene["symbol"]] = gene def _add_gff_gene( self, db: gffutils.FeatureDB, f: gffutils.Feature, f_id: str @@ -445,16 +458,19 @@ def _transform_data(self) -> None: prev_symbols = self._get_prev_symbols() info_genes = self._get_gene_info(prev_symbols) - # create db for gff file - db = gffutils.create_db( - str(self._gff_src), - dbfn=":memory:", - force=True, - merge_strategy="create_unique", - keep_order=True, - ) - - self._get_gene_gff(db, info_genes) + df = gffpd.read_gff3(self._gff_src).attributes_to_columns() + self._get_gene_gff(df, info_genes) + + # # create db for gff file + # db = gffutils.create_db( + # str(self._gff_src), + # dbfn=":memory:", + # force=True, + # merge_strategy="create_unique", + # keep_order=True, + # ) + # + # self._get_gene_gff(db, info_genes) for gene in info_genes.keys(): self._load_gene(info_genes[gene]) diff --git a/src/gene/etl/update.py b/src/gene/etl/update.py index 09a8912c..f969ed71 100644 --- a/src/gene/etl/update.py +++ b/src/gene/etl/update.py @@ -74,7 +74,7 @@ def load_source( SourceName.NCBI: NCBI, } - source_class = sources_table[source](database=db) + source_class = sources_table[source](database=db, silent=silent) try: processed_ids = source_class.perform_etl(use_existing) except GeneNormalizerEtlError as e: