Skip to content

Commit

Permalink
feat!: use wags-tails for data acquisition (#303)
Browse files Browse the repository at this point in the history
  • Loading branch information
jsstevenson authored Dec 4, 2023
1 parent 5f678bc commit c747340
Show file tree
Hide file tree
Showing 11 changed files with 95 additions and 491 deletions.
1 change: 1 addition & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ boto3 = "*"
gene = {editable = true, path = "."}
gffutils = "*"
"biocommons.seqrepo" = "*"
wags-tails = ">=0.1.1"
psycopg = {version = "*", extras=["binary"]}
pytest = "*"
pre-commit = "*"
Expand Down
6 changes: 1 addition & 5 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,9 @@ dynamic = ["version"]

[project.optional-dependencies]
pg = ["psycopg[binary]"]

etl = ["gffutils", "biocommons.seqrepo"]

etl = ["gffutils", "biocommons.seqrepo", "wags-tails>=0.1.1"]
test = ["pytest>=6.0", "pytest-cov", "mock", "httpx"]

dev = ["pre-commit", "ruff>=0.1.2"]

docs = [
"sphinx==6.1.3",
"sphinx-autodoc-typehints==1.22.0",
Expand Down
2 changes: 1 addition & 1 deletion src/gene/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ def _load_source(
click.get_current_context().exit()
SourceClass = eval(n.value) # noqa: N806

source = SourceClass(database=db)
source = SourceClass(database=db, silent=False)
try:
processed_ids += source.perform_etl(use_existing)
except GeneNormalizerEtlError as e:
Expand Down
148 changes: 46 additions & 102 deletions src/gene/etl/base.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,15 @@
"""A base class for extraction, transformation, and loading of data."""
import datetime
import gzip
import logging
import re
import shutil
from abc import ABC, abstractmethod
from ftplib import FTP
from os import remove
from pathlib import Path
from typing import Callable, Dict, List, Optional
from typing import Dict, List, Optional, Union

import click
import pydantic
from biocommons.seqrepo import SeqRepo
from dateutil import parser
from gffutils.feature import Feature
from wags_tails import EnsemblData, HgncData, NcbiGeneData

from gene import ITEM_TYPES, SEQREPO_ROOT_DIR
from gene.database import AbstractDatabase
Expand All @@ -23,50 +19,75 @@
logger.setLevel(logging.DEBUG)


DATA_DISPATCH = {
SourceName.HGNC: HgncData,
SourceName.ENSEMBL: EnsemblData,
SourceName.NCBI: NcbiGeneData,
}


class Base(ABC):
"""The ETL base class."""

def __init__(
self,
database: AbstractDatabase,
host: str,
data_dir: str,
src_data_dir: Path,
seqrepo_dir: Path = SEQREPO_ROOT_DIR,
data_path: Optional[Path] = None,
silent: bool = True,
) -> None:
"""Instantiate Base class.
:param database: database instance
:param host: Hostname of FTP site
:param data_dir: Data directory of FTP site to look at
:param src_data_dir: Data directory for source
:param seqrepo_dir: Path to seqrepo directory
:param data_path: path to app data directory
:param silent: if True, don't print ETL result to console
"""
self.src_data_dir = src_data_dir
self.src_data_dir.mkdir(exist_ok=True, parents=True)
self._silent = silent
self._src_name = SourceName(self.__class__.__name__)
self._data_source = self._get_data_handler(data_path)
self._database = database
self._host = host
self._data_dir = data_dir
self._processed_ids = list()
self.seqrepo = self.get_seqrepo(seqrepo_dir)
self._processed_ids = list()

def _get_data_handler(
self, data_path: Optional[Path] = None
) -> Union[HgncData, EnsemblData, NcbiGeneData]:
"""Construct data handler instance for source. Overwrite for edge-case sources.
:param data_path: location of data storage
:return: instance of wags_tails.DataSource to manage source file(s)
"""
return DATA_DISPATCH[self._src_name](data_dir=data_path, silent=self._silent)

def perform_etl(self, use_existing: bool = False) -> List[str]:
"""Extract, Transform, and Load data into database.
"""Public-facing method to begin ETL procedures on given data.
Returned concept IDs can be passed to Merge method for computing
merged concepts.
:param use_existing: if true, use most recent available local files
:return: Concept IDs of concepts successfully loaded
:param use_existing: if True, don't try to retrieve latest source data
:return: list of concept IDs which were successfully processed and
uploaded.
"""
self._extract_data(use_existing)
if not self._silent:
click.echo("Transforming and loading data to DB...")
self._add_meta()
self._transform_data()
self._database.complete_write_transaction()
return self._processed_ids

@abstractmethod
def _extract_data(self, *args, **kwargs) -> None: # noqa: ANN002
"""Extract data from FTP site or local data directory."""
raise NotImplementedError
def _extract_data(self, use_existing: bool) -> None:
"""Acquire source data.
This method is responsible for initializing an instance of a data handler and,
in most cases, setting ``self._data_file`` and ``self._version``.
:param bool use_existing: if True, don't try to fetch latest source data
"""
self._data_file, self._version = self._data_source.get_latest(
from_local=use_existing
)

@abstractmethod
def _transform_data(self) -> None:
Expand All @@ -78,40 +99,6 @@ def _add_meta(self) -> None:
"""Add source meta to database source info."""
raise NotImplementedError

def _acquire_data_file(
self,
file_glob: str,
use_existing: bool,
check_latest_callback: Callable[[Path], bool],
download_callback: Callable[[], Path],
) -> Path:
"""Acquire data file.
:param file_glob: pattern to match relevant files against
:param use_existing: don't fetch from remote origin if local versions are
available
:param check_latest_callback: function to check whether local data is up-to-date
:param download_callback: function to download from remote
:return: path to acquired data file
:raise FileNotFoundError: if unable to find any files matching the pattern
"""
matching_files = list(self.src_data_dir.glob(file_glob))
if not matching_files:
if use_existing:
raise FileNotFoundError(
f"No local files matching pattern {self.src_data_dir.absolute().as_uri() + file_glob}"
)
else:
return download_callback()
else:
latest_file = list(sorted(matching_files))[-1]
if use_existing:
return latest_file
if not check_latest_callback(latest_file):
return download_callback()
else:
return latest_file

def _load_gene(self, gene: Dict) -> None:
"""Load a gene record into database. This method takes responsibility for:
* validating structure correctness
Expand Down Expand Up @@ -142,49 +129,6 @@ def _load_gene(self, gene: Dict) -> None:
self._database.add_record(gene, self._src_name)
self._processed_ids.append(concept_id)

def _ftp_download(
self, host: str, data_dir: str, fn: str, source_dir: Path, data_fn: str
) -> Optional[str]:
"""Download data file from FTP site.
:param host: Source's FTP host name
:param data_dir: Data directory located on FTP site
:param fn: Filename for downloaded file
:param source_dir: Source's data directory
:param data_fn: Filename on FTP site to be downloaded
:return: Date file was last updated
"""
with FTP(host) as ftp:
ftp.login()
timestamp = ftp.voidcmd(f"MDTM {data_dir}{data_fn}")[4:].strip()
date = str(parser.parse(timestamp)).split()[0]
version = datetime.datetime.strptime(date, "%Y-%m-%d").strftime("%Y%m%d")
ftp.cwd(data_dir)
self._ftp_download_file(ftp, data_fn, source_dir, fn)
return version

def _ftp_download_file(
self, ftp: FTP, data_fn: str, source_dir: Path, fn: str
) -> None:
"""Download data file from FTP
:param ftp: FTP instance
:param data_fn: Filename on FTP site to be downloaded
:param source_dir: Source's data directory
:param fn: Filename for downloaded file
"""
if data_fn.endswith(".gz"):
filepath = source_dir / f"{fn}.gz"
else:
filepath = source_dir / fn
with open(filepath, "wb") as fp:
ftp.retrbinary(f"RETR {data_fn}", fp.write)
if data_fn.endswith(".gz"):
with gzip.open(filepath, "rb") as f_in:
with open(source_dir / fn, "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
remove(filepath)

def get_seqrepo(self, seqrepo_dir: Path) -> SeqRepo:
"""Return SeqRepo instance if seqrepo_dir exists.
Expand Down
107 changes: 11 additions & 96 deletions src/gene/etl/ensembl.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,14 @@
"""Defines the Ensembl ETL methods."""
import logging
import re
from ftplib import FTP
from json import dumps
from pathlib import Path
from typing import Dict

import gffutils
import requests
from gffutils.feature import Feature

from gene import APP_ROOT
from gene.database import AbstractDatabase
from gene.etl.base import Base
from gene.etl.exceptions import (
GeneFileVersionError,
GeneNormalizerEtlError,
GeneSourceFetchError,
)
from gene.schemas import NamespacePrefix, SourceMeta, SourceName, Strand

Expand All @@ -27,103 +19,26 @@
class Ensembl(Base):
"""ETL the Ensembl source into the normalized database."""

def __init__(
self,
database: AbstractDatabase,
host: str = "ftp.ensembl.org",
data_dir: str = "pub/current_gff3/homo_sapiens/",
src_data_dir: Path = APP_ROOT / "data" / "ensembl",
) -> None:
"""Initialize Ensembl ETL class.
:param database: gene database for adding new data
:param host: FTP host name
:param data_dir: FTP data directory to use
:param src_data_dir: Data directory for Ensembl
"""
super().__init__(database, host, data_dir, src_data_dir)
self._data_file_pattern = re.compile(r"ensembl_(GRCh\d+)_(\d+)\.gff3")
self._version = None
self._data_url = {}
self._assembly = None

def _is_up_to_date(self, data_file: Path) -> bool:
"""Verify whether local data is up-to-date with latest available remote file.
:param data_file: path to latest local file
:return: True if data is up-to-date
:raise GeneFileVersionError: if unable to parse version number from local file
:raise GeneSourceFetchError: if unable to get latest version from remote source
"""
local_match = re.match(self._data_file_pattern, data_file.name)
try:
version = int(local_match.groups()[1])
except (AttributeError, IndexError, ValueError):
raise GeneFileVersionError(
f"Unable to parse version number from local file: {data_file.absolute()}"
)

ensembl_api = (
"https://rest.ensembl.org/info/data/?content-type=application/json"
)
response = requests.get(ensembl_api)
if response.status_code != 200:
raise GeneSourceFetchError(
f"Unable to get response from Ensembl version API endpoint: {ensembl_api}"
)
releases = response.json().get("releases")
if not releases:
raise GeneSourceFetchError(
f"Malformed response from Ensembl version API endpoint: {dumps(response.json())}"
)
releases.sort()
return version == releases[-1]

def _download_data(self) -> Path:
"""Download latest Ensembl GFF3 data file.
:return: path to acquired file
:raise GeneSourceFetchError: if unable to find file matching expected pattern
"""
logger.info("Downloading latest Ensembl data file...")
pattern = r"Homo_sapiens\.(?P<assembly>GRCh\d+)\.(?P<version>\d+)\.gff3\.gz"
with FTP(self._host) as ftp:
ftp.login()
ftp.cwd(self._data_dir)
files = ftp.nlst()
for f in files:
match = re.match(pattern, f)
if match:
resp = match.groupdict()
assembly = resp["assembly"]
version = resp["version"]
new_fn = f"ensembl_{assembly}_{version}.gff3"
self._ftp_download_file(ftp, f, self.src_data_dir, new_fn)
logger.info(
f"Successfully downloaded Ensembl {version} data to {self.src_data_dir / new_fn}."
)
return self.src_data_dir / new_fn
raise GeneSourceFetchError(
"Unable to find file matching expected Ensembl pattern via FTP"
)

def _extract_data(self, use_existing: bool) -> None:
"""Acquire Ensembl data file and get metadata.
"""Acquire source data.
This method is responsible for initializing an instance of a data handler and,
in most cases, setting ``self._data_file`` and ``self._version``.
:param use_existing: if True, use latest available local file
:param use_existing: if True, don't try to fetch latest source data
"""
self._data_src = self._acquire_data_file(
"ensembl_*.gff3", use_existing, self._is_up_to_date, self._download_data
self._data_file, raw_version = self._data_source.get_latest(
from_local=use_existing
)
match = re.match(self._data_file_pattern, self._data_src.name)
match = re.match(r"(GRCh\d+)_(\d+)", raw_version)
self._assembly = match.groups()[0]
self._version = match.groups()[1]

def _transform_data(self) -> None:
"""Transform the Ensembl source."""
logger.info("Transforming Ensembl...")
db = gffutils.create_db(
str(self._data_src),
str(self._data_file),
dbfn=":memory:",
force=True,
merge_strategy="create_unique",
Expand Down Expand Up @@ -255,7 +170,7 @@ def _add_meta(self) -> None:
:raise GeneNormalizerEtlError: if requisite metadata is unset
"""
if not all([self._version, self._host, self._data_dir, self._assembly]):
if not self._version or not self._assembly:
raise GeneNormalizerEtlError(
"Source metadata unavailable -- was data properly acquired before attempting to load DB?"
)
Expand All @@ -265,7 +180,7 @@ def _add_meta(self) -> None:
"/legal/disclaimer.html",
version=self._version,
data_url={
"genome_annotations": f"ftp://{self._host}/{self._data_dir}Homo_sapiens.{self._assembly}.{self._version}.gff3.gz"
"genome_annotations": f"ftp://ftp.ensembl.org/pub/release-{self._version}/gff3/homo_sapiens/Homo_sapiens.{self._assembly}.{self._version}.gff3.gz"
},
rdp_url=None,
data_license_attributes={
Expand Down
Loading

0 comments on commit c747340

Please sign in to comment.