Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add progress bars for ETL #319

Closed
wants to merge 13 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ dynamic = ["version"]

[project.optional-dependencies]
pg = ["psycopg[binary]"]
etl = ["gffutils", "biocommons.seqrepo", "wags-tails>=0.1.1"]
etl = ["tqdm", "gffpandas", "pandas", "biocommons.seqrepo", "wags-tails>=0.1.1"]
test = ["pytest>=6.0", "pytest-cov", "mock", "httpx"]
dev = ["pre-commit", "ruff>=0.1.9"]
docs = [
Expand Down
7 changes: 7 additions & 0 deletions src/gene/cli.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Provides a CLI util to make updates to normalizer database."""
import logging
import os
from pathlib import Path
from typing import Optional, Tuple
Expand All @@ -16,6 +17,12 @@
@click.group()
def cli() -> None:
"""Manage Gene Normalizer data."""
logging.basicConfig(
filename="gene-normalizer.log",
format="%(asctime)s %(levelname)s:%(message)s",
level=logging.INFO,
force=True,
)


@cli.command()
Expand Down
68 changes: 35 additions & 33 deletions src/gene/etl/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
from typing import Dict, List, Optional, Union

import click
import pandas as pd
import pydantic
from biocommons.seqrepo import SeqRepo
from gffutils.feature import Feature
from wags_tails import EnsemblData, HgncData, NcbiGeneData

from gene.database import AbstractDatabase
Expand All @@ -35,13 +35,6 @@ class GeneNormalizerEtlError(Exception):
}


DATA_DISPATCH = {
SourceName.HGNC: HgncData,
SourceName.ENSEMBL: EnsemblData,
SourceName.NCBI: NcbiGeneData,
}


class Base(ABC):
"""The ETL base class."""

Expand Down Expand Up @@ -86,13 +79,13 @@ def perform_etl(self, use_existing: bool = False) -> List[str]:
uploaded.
"""
self._extract_data(use_existing)
_logger.info(f"Transforming and loading {self._src_name} data to DB...")
if not self._silent:
click.echo("Transforming and loading data to DB...")
self._print_info(
f"Transforming and loading {self._src_name.value} data to DB..."
)
self._add_meta()
self._transform_data()
self._database.complete_write_transaction()
_logger.info(f"Data load complete for {self._src_name}.")
self._print_info(f"Data load complete for {self._src_name.value}.")
return self._processed_ids

def _extract_data(self, use_existing: bool) -> None:
Expand Down Expand Up @@ -214,44 +207,53 @@ def _set_cl_interval_range(self, loc: str, arm_ix: int, location: Dict) -> None:
# return chr_location
# return None

def _get_seq_id_aliases(self, seq_id: str) -> List[str]:
"""Get GA4GH aliases for a sequence id

:param seq_id: Sequence ID accession
:return: List of aliases for seqid
"""
aliases = []
try:
aliases = self.seqrepo.translate_alias(seq_id, target_namespaces="ga4gh")
except KeyError as e:
_logger.warning(f"SeqRepo raised KeyError: {e}")
return aliases

def _build_sequence_location(
self, seq_id: str, gene: Feature, concept_id: str
self, seq_id: str, row: pd.Series, concept_id: str
) -> Optional[StoredSequenceLocation]:
"""Construct a sequence location for storing in a DB.

:param seq_id: The sequence ID.
:param gene: A gene from the source file.
:param row: A gene from the source file.
:param concept_id: record ID from source
:return: A storable SequenceLocation containing relevant params for returning a
VRS SequenceLocation, or None if unable to retrieve valid parameters
"""
aliases = self._get_seq_id_aliases(seq_id)
if not aliases or gene.start is None or gene.end is None:
if not aliases or row.start is None or row.end is None:
return None

sequence = aliases[0]

if gene.start != "." and gene.end != "." and sequence:
if 0 <= gene.start <= gene.end:
if row.start != "." and row.end != "." and sequence:
if 0 <= row.start <= row.end:
return StoredSequenceLocation(
start=gene.start - 1,
end=gene.end,
start=row.start - 1,
end=row.end,
sequence_id=sequence,
)
else:
_logger.warning(
f"{concept_id} has invalid interval: start={gene.start - 1} end={gene.end}"
f"{concept_id} has invalid interval: start={row.start - 1} end={row.end}"
)

def _get_seq_id_aliases(self, seq_id: str) -> List[str]:
"""Get GA4GH aliases for a sequence id

:param seq_id: Sequence ID accession
:return: List of aliases for seqid
"""
aliases = []
try:
aliases = self.seqrepo.translate_alias(seq_id, target_namespaces="ga4gh")
except KeyError as e:
_logger.warning(f"SeqRepo raised KeyError: {e}")
return aliases

def _print_info(self, msg: str) -> None:
"""Log information and print to console if not on silent mode.

:param msg: message to print
"""
if not self._silent:
click.echo(msg)
_logger.info(msg)
85 changes: 39 additions & 46 deletions src/gene/etl/ensembl.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
import logging
import re
from typing import Dict, Optional
from urllib.parse import unquote

import gffutils
from gffutils.feature import Feature
import gffpandas.gffpandas as gffpd
import pandas as pd
from tqdm import tqdm

from gene.etl.base import Base, GeneNormalizerEtlError
from gene.schemas import (
Expand Down Expand Up @@ -40,45 +42,41 @@ def _extract_data(self, use_existing: bool) -> None:
def _transform_data(self) -> None:
"""Transform the Ensembl source."""
_logger.info("Transforming Ensembl data...")
db = gffutils.create_db(
str(self._data_file),
dbfn=":memory:",
force=True,
merge_strategy="create_unique",
keep_order=True,
df = gffpd.read_gff3(self._data_file).attributes_to_columns()
df["seq_id"] = df["seq_id"].astype(str)
df["description"] = df["description"].apply(
lambda d: unquote(d) if d is not None else None
)
accession_numbers = {}
for _, row in df[df["type"].isin(["chromosome", "scaffold"])].iterrows():
accession_numbers[row.seq_id] = row.Alias.split(",")[-1]

# Get accession numbers
accession_numbers = dict()
for item in db.features_of_type("scaffold"):
accession_numbers[item[0]] = item[8]["Alias"][-1]
for item in db.features_of_type("chromosome"):
accession_numbers[item[0]] = item[8]["Alias"][-1]

for f in db.all_features():
if f.attributes.get("ID"):
f_id = f.attributes.get("ID")[0].split(":")[0]
if f_id == "gene":
gene = self._add_gene(f, accession_numbers)
self._load_gene(gene)
gene_df = df[df["ID"].str.startswith("gene", na=False)]

self._print_info(f"Loading rows from {self._data_file}:")
for _, row in tqdm(
gene_df.iterrows(), total=gene_df.shape[0], disable=self._silent, ncols=80
):
gene = self._add_gene(row, accession_numbers)
self._load_gene(gene)
_logger.info("Ensembl data transform complete.")

def _add_gene(self, f: Feature, accession_numbers: Dict) -> Dict:
def _add_gene(self, row: pd.Series, accession_numbers: Dict) -> Dict:
"""Create a transformed gene record.

:param f: A gene from the data
:param row: A row from the gene data table
:param accession_numbers: Accession numbers for each chromosome and scaffold
:return: A gene dictionary containing data if the ID attribute exists.
"""
gene_params = dict()
if f.strand == "-":
if row.strand == "-":
gene_params["strand"] = Strand.REVERSE.value
elif f.strand == "+":
elif row.strand == "+":
gene_params["strand"] = Strand.FORWARD.value

self._add_attributes(f, gene_params)
self._add_attributes(row, gene_params)
location = self._build_sequence_location(
accession_numbers[f.seqid], f, gene_params["concept_id"]
accession_numbers[row.seq_id], row, gene_params["concept_id"]
)
if location:
gene_params["locations"] = [location]
Expand All @@ -88,28 +86,23 @@ def _add_gene(self, f: Feature, accession_numbers: Dict) -> Dict:

return gene_params

def _add_attributes(self, f: Feature, gene: Dict) -> None:
def _add_attributes(self, row: pd.Series, gene: Dict) -> None:
"""Add concept_id, symbol, and xrefs to a gene record.

:param f: A gene from the data
:param row: A gene from the data
:param gene: A transformed gene record
"""
for key, value in f.attributes.items():
if key == "ID" and value[0].startswith("gene"):
gene[
"concept_id"
] = f"{NamespacePrefix.ENSEMBL.value}:{value[0].split(':')[1]}"
elif key == "description":
pattern = "^(.*) \\[Source:([^\\s]*)?( .*)?;Acc:(.*:)?(.*)?\\]$"
matches = re.findall(pattern, value[0])
if matches:
gene["label"] = matches[0][0]
if matches[0][1] and matches[0][4]:
gene["xrefs"] = [self._get_xref(matches[0][1], matches[0][4])]
elif key == "Name":
gene["symbol"] = value[0]
elif key == "biotype":
gene["gene_type"] = value[0]
gene["concept_id"] = f"{NamespacePrefix.ENSEMBL.value}:{row.ID.split(':')[1]}"
gene["symbol"] = row.Name
gene["gene_type"] = row.biotype

if row.description:
pattern = "^(.*) \\[Source:([^\\s]*)?( .*)?Acc:(.*:)?(.*)?\\]$"
matches = re.findall(pattern, row.description)
if matches:
gene["label"] = matches[0][0]
if matches[0][1] and matches[0][4]:
gene["xrefs"] = [self._get_xref(matches[0][1], matches[0][4])]

def _get_xref(self, src_name: str, src_id: str) -> Optional[str]:
"""Get xref.
Expand All @@ -127,7 +120,7 @@ def _get_xref(self, src_name: str, src_id: str) -> Optional[str]:
):
if src_name.startswith(prefix):
return f"{constrained_prefix.value}:{src_id}"
_logger.warning("Unrecognized source name: %:%", src_name, src_id)
_logger.warning("Unrecognized source name: %s:%s", src_name, src_id)
return None

def _add_meta(self) -> None:
Expand Down
5 changes: 4 additions & 1 deletion src/gene/etl/hgnc.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import re
from typing import Dict, List

from tqdm import tqdm

from gene.etl.base import Base, GeneNormalizerEtlError
from gene.schemas import (
Annotation,
Expand All @@ -29,7 +31,8 @@ def _transform_data(self) -> None:

records = data["response"]["docs"]

for r in records:
self._print_info(f"Loading rows from {self._data_file}:")
for r in tqdm(records, total=len(records), disable=self._silent, ncols=80):
gene = {
"concept_id": r["hgnc_id"].lower(),
"symbol": r["symbol"],
Expand Down
44 changes: 36 additions & 8 deletions src/gene/etl/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
from timeit import default_timer as timer
from typing import Dict, Optional, Set, Tuple

import click
from tqdm import tqdm

from gene.database import AbstractDatabase
from gene.database.database import DatabaseWriteError
from gene.schemas import GeneTypeFieldName, NamespacePrefix, RecordType, SourcePriority
Expand All @@ -13,36 +16,55 @@
class Merge:
"""Handles record merging."""

def __init__(self, database: AbstractDatabase) -> None:
def __init__(self, database: AbstractDatabase, silent: bool = True) -> None:
"""Initialize Merge instance.

:param database: db instance to use for record retrieval and creation.
:param silent: if True, don't print ETL result to console
"""
self._database = database
self._groups = {} # dict keying concept IDs to group Sets
self._silent = silent

def create_merged_concepts(self, record_ids: Set[str]) -> None:
"""Create concept groups, generate merged concept records, and update database.

:param record_ids: concept identifiers from which groups should be generated.
Should *not* include any records from excluded sources.
"""
_logger.info("Generating record ID sets...")
start = timer()
for record_id in record_ids:
msg = "Generating record ID sets..."
if not self._silent:
click.echo(msg)
_logger.info(msg)

for record_id in tqdm(
record_ids, total=len(record_ids), ncols=80, disable=self._silent
):
new_group = self._create_record_id_set(record_id)
if new_group:
for concept_id in new_group:
self._groups[concept_id] = new_group
end = timer()
_logger.debug(f"Built record ID sets in {end - start} seconds")
msg = f"Built record ID sets in {end - start} seconds"
if not self._silent:
click.echo(msg)
_logger.info(msg)

self._groups = {k: v for k, v in self._groups.items() if len(v) > 1}

_logger.info("Creating merged records and updating database...")
msg = "Creating merged records and updating database..."
if not self._silent:
click.echo(msg)
_logger.info(msg)
uploaded_ids = set()
start = timer()
for record_id, group in self._groups.items():
for record_id, group in tqdm(
self._groups.items(),
total=len(self._groups),
ncols=80,
disable=self._silent,
):
if record_id in uploaded_ids:
continue
merged_record = self._generate_merged_record(group)
Expand All @@ -65,9 +87,15 @@ def create_merged_concepts(self, record_ids: Set[str]) -> None:
_logger.error(str(dw))
uploaded_ids |= group
self._database.complete_write_transaction()
_logger.info("Merged concept generation successful.")
msg = "Merged concept generation successful."
if not self._silent:
click.echo(msg)
_logger.info(msg)
end = timer()
_logger.debug(f"Generated and added concepts in {end - start} seconds")
msg = f"Generated and added concepts in {end - start} seconds"
if not self._silent:
click.echo(msg)
_logger.debug(msg)

def _create_record_id_set(
self, record_id: str, observed_id_set: Optional[Set] = None
Expand Down
Loading