From 4a4db24c8360c7748591515effa50de8d9a8346b Mon Sep 17 00:00:00 2001 From: James Stevenson Date: Wed, 13 Dec 2023 16:20:58 -0500 Subject: [PATCH] logging stuff --- src/gene/cli.py | 5 ---- src/gene/database/dynamodb.py | 43 +++++++++++++-------------------- src/gene/database/postgresql.py | 22 ++++++++--------- src/gene/etl/base.py | 16 ++++++------ src/gene/etl/ensembl.py | 9 ++++--- src/gene/etl/hgnc.py | 9 +++---- src/gene/etl/merge.py | 21 ++++++++-------- src/gene/etl/ncbi.py | 19 +++++++++------ src/gene/etl/update.py | 2 +- 9 files changed, 67 insertions(+), 79 deletions(-) diff --git a/src/gene/cli.py b/src/gene/cli.py index fc4e168c..9d852c0f 100644 --- a/src/gene/cli.py +++ b/src/gene/cli.py @@ -1,5 +1,4 @@ """Provides a CLI util to make updates to normalizer database.""" -import logging import os from pathlib import Path from typing import Optional, Tuple @@ -11,10 +10,6 @@ from gene.etl.update import update_all_sources, update_normalized, update_source from gene.schemas import SourceName -logger = logging.getLogger("gene") -logger.setLevel(logging.DEBUG) - - url_description = 'URL endpoint for the application database. Can either be a URL to a local DynamoDB server (e.g. "http://localhost:8001") or a libpq-compliant PostgreSQL connection description (e.g. "postgresql://postgres:password@localhost:5432/gene_normalizer").' diff --git a/src/gene/database/dynamodb.py b/src/gene/database/dynamodb.py index e33c98ec..8d243005 100644 --- a/src/gene/database/dynamodb.py +++ b/src/gene/database/dynamodb.py @@ -31,7 +31,7 @@ SourceName, ) -logger = logging.getLogger(__name__) +_logger = logging.getLogger(__name__) class DynamoDbDatabase(AbstractDatabase): @@ -165,7 +165,7 @@ def check_schema_initialized(self) -> bool: existing_tables = self.list_tables() exists = self.gene_table in existing_tables if not exists: - logger.info(f"{self.gene_table} table is missing or unavailable.") + _logger.info(f"{self.gene_table} table is missing or unavailable.") return exists def check_tables_populated(self) -> bool: @@ -182,7 +182,7 @@ def check_tables_populated(self) -> bool: KeyConditionExpression=Key("item_type").eq("source"), ).get("Items", []) if len(sources) < len(SourceName): - logger.info("Gene sources table is missing expected sources.") + _logger.info("Gene sources table is missing expected sources.") return False records = self.genes.query( @@ -191,7 +191,7 @@ def check_tables_populated(self) -> bool: Limit=1, ) if len(records.get("Items", [])) < 1: - logger.info("Gene records index is empty.") + _logger.info("Gene records index is empty.") return False normalized_records = self.genes.query( @@ -200,7 +200,7 @@ def check_tables_populated(self) -> bool: Limit=1, ) if len(normalized_records.get("Items", [])) < 1: - logger.info("Normalized gene records index is empty.") + _logger.info("Normalized gene records index is empty.") return False return True @@ -262,10 +262,8 @@ def get_record_by_id( del record["label_and_type"] return record except ClientError as e: - logger.error( - f"boto3 client error on get_records_by_id for " - f"search term {concept_id}: " - f"{e.response['Error']['Message']}" + _logger.error( + f"boto3 client error on get_records_by_id for search term {concept_id}: {e.response['Error']['Message']}" ) return None except (KeyError, IndexError): # record doesn't exist @@ -285,10 +283,8 @@ def get_refs_by_type(self, search_term: str, ref_type: RefType) -> List[str]: matches = self.genes.query(KeyConditionExpression=filter_exp) return [m["concept_id"] for m in matches.get("Items", None)] except ClientError as e: - logger.error( - f"boto3 client error on get_refs_by_type for " - f"search term {search_term}: " - f"{e.response['Error']['Message']}" + _logger.error( + f"boto3 client error on get_refs_by_type for search term {search_term}: {e.response['Error']['Message']}" ) return [] @@ -389,9 +385,8 @@ def add_record(self, record: Dict, src_name: SourceName) -> None: try: self.batch.put_item(Item=record) except ClientError as e: - logger.error( - "boto3 client error on add_record for " - f"{concept_id}: {e.response['Error']['Message']}" + _logger.error( + f"boto3 client error on add_record for {concept_id}: {e.response['Error']['Message']}" ) for attr_type, item_type in ITEM_TYPES.items(): if attr_type in record: @@ -421,9 +416,8 @@ def add_merged_record(self, record: Dict) -> None: try: self.batch.put_item(Item=record) except ClientError as e: - logger.error( - "boto3 client error on add_record for " - f"{concept_id}: {e.response['Error']['Message']}" + _logger.error( + f"boto3 client error on add_record for {concept_id}: {e.response['Error']['Message']}" ) def _add_ref_record( @@ -447,10 +441,8 @@ def _add_ref_record( try: self.batch.put_item(Item=record) except ClientError as e: - logger.error( - f"boto3 client error adding reference {term} for " - f"{concept_id} with match type {ref_type}: " - f"{e.response['Error']['Message']}" + _logger.error( + f"boto3 client error adding reference {term} for {concept_id} with match type {ref_type}: {e.response['Error']['Message']}" ) def update_merge_ref(self, concept_id: str, merge_ref: Any) -> None: # noqa: ANN401 @@ -479,9 +471,8 @@ def update_merge_ref(self, concept_id: str, merge_ref: Any) -> None: # noqa: AN f"No such record exists for keys {label_and_type}, {concept_id}" ) else: - logger.error( - f"boto3 client error in `database.update_record()`: " - f"{e.response['Error']['Message']}" + _logger.error( + f"boto3 client error in `database.update_record()`: {e.response['Error']['Message']}" ) def delete_normalized_concepts(self) -> None: diff --git a/src/gene/database/postgresql.py b/src/gene/database/postgresql.py index 82149cb9..8c187b91 100644 --- a/src/gene/database/postgresql.py +++ b/src/gene/database/postgresql.py @@ -27,7 +27,7 @@ ) from gene.schemas import RecordType, RefType, SourceMeta, SourceName -logger = logging.getLogger(__name__) +_logger = logging.getLogger(__name__) SCRIPTS_DIR = Path(__file__).parent / "postgresql" @@ -124,7 +124,7 @@ def drop_db(self) -> None: with self.conn.cursor() as cur: cur.execute(self._drop_db_query) self.conn.commit() - logger.info("Dropped all existing gene normalizer tables.") + _logger.info("Dropped all existing gene normalizer tables.") def check_schema_initialized(self) -> bool: """Check if database schema is properly initialized. @@ -137,7 +137,7 @@ def check_schema_initialized(self) -> bool: except DuplicateTable: self.conn.rollback() else: - logger.info("Gene table existence check failed.") + _logger.info("Gene table existence check failed.") self.conn.rollback() return False try: @@ -146,7 +146,7 @@ def check_schema_initialized(self) -> bool: except DuplicateObject: self.conn.rollback() else: - logger.info("Gene foreign key existence check failed.") + _logger.info("Gene foreign key existence check failed.") self.conn.rollback() return False try: @@ -157,7 +157,7 @@ def check_schema_initialized(self) -> bool: except DuplicateTable: self.conn.rollback() else: - logger.info("Gene normalized view lookup failed.") + _logger.info("Gene normalized view lookup failed.") self.conn.rollback() return False try: @@ -166,7 +166,7 @@ def check_schema_initialized(self) -> bool: except DuplicateTable: self.conn.rollback() else: - logger.info("Gene indexes check failed.") + _logger.info("Gene indexes check failed.") self.conn.rollback() return False @@ -189,21 +189,21 @@ def check_tables_populated(self) -> bool: cur.execute(self._check_sources_query) results = cur.fetchall() if len(results) < len(SourceName): - logger.info("Gene sources table is missing expected sources.") + _logger.info("Gene sources table is missing expected sources.") return False with self.conn.cursor() as cur: cur.execute(self._check_concepts_query) result = cur.fetchone() if not result or result[0] < 1: - logger.info("Gene records table is empty.") + _logger.info("Gene records table is empty.") return False with self.conn.cursor() as cur: cur.execute(self._check_merged_query) result = cur.fetchone() if not result or result[0] < 1: - logger.info("Normalized gene records table is empty.") + _logger.info("Normalized gene records table is empty.") return False return True @@ -265,7 +265,7 @@ def _drop_indexes(self) -> None: def _create_tables(self) -> None: """Create all tables, indexes, and views.""" - logger.debug("Creating new gene normalizer tables.") + _logger.info("Creating new gene normalizer tables.") tables_query = (SCRIPTS_DIR / "create_tables.sql").read_bytes() with self.conn.cursor() as cur: @@ -599,7 +599,7 @@ def add_record(self, record: Dict, src_name: SourceName) -> None: cur.execute(self._ins_symbol_query, [record["symbol"], concept_id]) self.conn.commit() except UniqueViolation: - logger.error(f"Record with ID {concept_id} already exists") + _logger.error(f"Record with ID {concept_id} already exists") self.conn.rollback() _add_merged_record_query = b""" diff --git a/src/gene/etl/base.py b/src/gene/etl/base.py index f06c89a0..34d94c2b 100644 --- a/src/gene/etl/base.py +++ b/src/gene/etl/base.py @@ -6,7 +6,6 @@ from pathlib import Path from typing import Dict, List, Optional, Union -import click import pydantic from biocommons.seqrepo import SeqRepo from gffutils.feature import Feature @@ -15,8 +14,7 @@ from gene.database import AbstractDatabase from gene.schemas import ITEM_TYPES, Gene, GeneSequenceLocation, MatchType, SourceName -logger = logging.getLogger("gene") -logger.setLevel(logging.DEBUG) +_logger = logging.getLogger(__name__) APP_ROOT = Path(__file__).resolve().parent @@ -76,11 +74,11 @@ def perform_etl(self, use_existing: bool = False) -> List[str]: uploaded. """ self._extract_data(use_existing) - if not self._silent: - click.echo("Transforming and loading data to DB...") + _logger.info(f"Transforming and loading {self._src_name} data to DB...") self._add_meta() self._transform_data() self._database.complete_write_transaction() + _logger.info(f"Data load complete for {self._src_name}.") return self._processed_ids def _extract_data(self, use_existing: bool) -> None: @@ -91,9 +89,11 @@ def _extract_data(self, use_existing: bool) -> None: :param bool use_existing: if True, don't try to fetch latest source data """ + _logger.info(f"Gathering {self._src_name} data...") self._data_file, self._version = self._data_source.get_latest( from_local=use_existing ) + _logger.info(f"Acquired data for {self._src_name}: {self._data_file}") @abstractmethod def _transform_data(self) -> None: @@ -116,7 +116,7 @@ def _load_gene(self, gene: Dict) -> None: try: assert Gene(match_type=MatchType.NO_MATCH, **gene) except pydantic.ValidationError as e: - logger.warning(f"Unable to load {gene} due to validation error: " f"{e}") + _logger.warning(f"Unable to load {gene} due to validation error: " f"{e}") else: concept_id = gene["concept_id"] gene["label_and_type"] = f"{concept_id.lower()}##identity" @@ -217,7 +217,7 @@ def _get_seq_id_aliases(self, seq_id: str) -> List[str]: try: aliases = self.seqrepo.translate_alias(seq_id, target_namespaces="ga4gh") except KeyError as e: - logger.warning(f"SeqRepo raised KeyError: {e}") + _logger.warning(f"SeqRepo raised KeyError: {e}") return aliases def _get_sequence_location(self, seq_id: str, gene: Feature, params: Dict) -> Dict: @@ -244,7 +244,7 @@ def _get_sequence_location(self, seq_id: str, gene: Feature, params: Dict) -> Di sequence_id=sequence, ).model_dump() # type: ignore else: - logger.warning( + _logger.warning( f"{params['concept_id']} has invalid interval:" f"start={gene.start - 1} end={gene.end}" ) # type: ignore diff --git a/src/gene/etl/ensembl.py b/src/gene/etl/ensembl.py index 4a52975a..d63d257f 100644 --- a/src/gene/etl/ensembl.py +++ b/src/gene/etl/ensembl.py @@ -12,8 +12,7 @@ ) from gene.schemas import NamespacePrefix, SourceMeta, SourceName, Strand -logger = logging.getLogger("gene") -logger.setLevel(logging.DEBUG) +_logger = logging.getLogger(__name__) class Ensembl(Base): @@ -27,16 +26,18 @@ def _extract_data(self, use_existing: bool) -> None: :param use_existing: if True, don't try to fetch latest source data """ + _logger.info("Gathering Ensembl data...") self._data_file, raw_version = self._data_source.get_latest( from_local=use_existing ) match = re.match(r"(GRCh\d+)_(\d+)", raw_version) self._assembly = match.groups()[0] self._version = match.groups()[1] + _logger.info(f"Acquired data for Ensembl: {self._data_file}") def _transform_data(self) -> None: """Transform the Ensembl source.""" - logger.info("Transforming Ensembl...") + _logger.info("Transforming Ensembl data...") db = gffutils.create_db( str(self._data_file), dbfn=":memory:", @@ -59,7 +60,7 @@ def _transform_data(self) -> None: gene = self._add_gene(f, accession_numbers) if gene: self._load_gene(gene) - logger.info("Successfully transformed Ensembl.") + _logger.info("Ensembl data transform complete.") def _add_gene(self, f: Feature, accession_numbers: Dict) -> Dict: """Create a transformed gene record. diff --git a/src/gene/etl/hgnc.py b/src/gene/etl/hgnc.py index 9e4c048e..e021fd24 100644 --- a/src/gene/etl/hgnc.py +++ b/src/gene/etl/hgnc.py @@ -18,8 +18,7 @@ SymbolStatus, ) -logger = logging.getLogger("gene") -logger.setLevel(logging.DEBUG) +_logger = logging.getLogger(__name__) class HGNC(Base): @@ -27,7 +26,7 @@ class HGNC(Base): def _transform_data(self) -> None: """Transform the HGNC source.""" - logger.info("Transforming HGNC...") + _logger.info("Transforming HGNC data...") with open(self._data_file, "r") as f: # type: ignore data = json.load(f) @@ -58,7 +57,7 @@ def _transform_data(self) -> None: if "locus_type" in r: gene["gene_type"] = r["locus_type"] self._load_gene(gene) - logger.info("Successfully transformed HGNC.") + _logger.info("HGNC data transform complete.") def _get_aliases(self, r: Dict, gene: Dict) -> None: """Store aliases in a gene record. @@ -139,7 +138,7 @@ def _get_xrefs_associated_with(self, r: Dict, gene: Dict) -> None: else: self._get_xref_associated_with(key, src, r, associated_with) else: - logger.warning(f"{key} not in schemas.py") + _logger.warning(f"{key} not in schemas.py") if xrefs: gene["xrefs"] = xrefs diff --git a/src/gene/etl/merge.py b/src/gene/etl/merge.py index 8124d294..651ce8ae 100644 --- a/src/gene/etl/merge.py +++ b/src/gene/etl/merge.py @@ -7,8 +7,7 @@ from gene.database.database import DatabaseWriteException from gene.schemas import GeneTypeFieldName, RecordType, SourcePriority -logger = logging.getLogger("gene") -logger.setLevel(logging.DEBUG) +_logger = logging.getLogger(__name__) class Merge: @@ -28,7 +27,7 @@ def create_merged_concepts(self, record_ids: Set[str]) -> None: :param record_ids: concept identifiers from which groups should be generated. Should *not* include any records from excluded sources. """ - logger.info("Generating record ID sets...") + _logger.info("Generating record ID sets...") start = timer() for record_id in record_ids: new_group = self._create_record_id_set(record_id) @@ -36,11 +35,11 @@ def create_merged_concepts(self, record_ids: Set[str]) -> None: for concept_id in new_group: self._groups[concept_id] = new_group end = timer() - logger.debug(f"Built record ID sets in {end - start} seconds") + _logger.debug(f"Built record ID sets in {end - start} seconds") self._groups = {k: v for k, v in self._groups.items() if len(v) > 1} - logger.info("Creating merged records and updating database...") + _logger.info("Creating merged records and updating database...") uploaded_ids = set() start = timer() for record_id, group in self._groups.items(): @@ -58,17 +57,17 @@ def create_merged_concepts(self, record_ids: Set[str]) -> None: self._database.update_merge_ref(concept_id, merge_ref) except DatabaseWriteException as dw: if str(dw).startswith("No such record exists"): - logger.error( + _logger.error( f"Updating nonexistent record: {concept_id} " f"for merge ref to {merge_ref}" ) else: - logger.error(str(dw)) + _logger.error(str(dw)) uploaded_ids |= group self._database.complete_write_transaction() - logger.info("Merged concept generation successful.") + _logger.info("Merged concept generation successful.") end = timer() - logger.debug(f"Generated and added concepts in {end - start} seconds") + _logger.debug(f"Generated and added concepts in {end - start} seconds") def _create_record_id_set( self, record_id: str, observed_id_set: Optional[Set] = None @@ -88,7 +87,7 @@ def _create_record_id_set( else: db_record = self._database.get_record_by_id(record_id) if not db_record: - logger.warning( + _logger.warning( f"Record ID set creator could not resolve " f"lookup for {record_id} in ID set: " f"{observed_id_set}" @@ -124,7 +123,7 @@ def _generate_merged_record(self, record_id_set: Set[str]) -> Dict: if record: records.append(record) else: - logger.error( + _logger.error( f"Merge record generator could not retrieve " f"record for {record_id} in {record_id_set}" ) diff --git a/src/gene/etl/ncbi.py b/src/gene/etl/ncbi.py index a136c4b2..db0f8750 100644 --- a/src/gene/etl/ncbi.py +++ b/src/gene/etl/ncbi.py @@ -24,8 +24,7 @@ SymbolStatus, ) -logger = logging.getLogger("gene") -logger.setLevel(logging.DEBUG) +_logger = logging.getLogger(__name__) class NCBI(Base): @@ -53,6 +52,7 @@ def _extract_data(self, use_existing: bool) -> None: :param use_existing: if True, use latest available local file """ + _logger.info(f"Gathering {self._src_name} data...") self._gff_src, self._assembly = self._genome_data_handler.get_latest( from_local=use_existing ) @@ -65,6 +65,9 @@ def _extract_data(self, use_existing: bool) -> None: self._gene_url = "ftp.ncbi.nlm.nih.gov/gene/DATA/GENE_INFO/Mammalia/Homo_sapiens.gene_info.gz" self._history_url = "ftp.ncbi.nlm.nih.gov/gene/DATA/gene_history.gz" self._assembly_url = "ftp.ncbi.nlm.nih.gov/genomes/refseq/vertebrate_mammalian/Homo_sapiens/latest_assembly_versions/" + _logger.info( + f"Acquired data for {self._src_name}: {self._gff_src}, {self._info_src}, {self._history_src}" + ) def _get_prev_symbols(self) -> Dict[str, str]: """Store a gene's symbol history. @@ -128,7 +131,7 @@ def _add_xrefs_associated_with(self, val: List[str], params: Dict) -> None: if prefix: params["associated_with"].append(f"{prefix}:{src_id}") else: - logger.info(f"{src_name} is not in NameSpacePrefix.") + _logger.info(f"{src_name} is not in NameSpacePrefix.") if not params["xrefs"]: del params["xrefs"] if not params["associated_with"]: @@ -322,7 +325,7 @@ def _set_chromsomes_locations(self, row: List[str], params: Dict) -> Dict: if len(chromosomes) >= 2: if chromosomes and "X" not in chromosomes and "Y" not in chromosomes: - logger.info( + _logger.info( f"{row[2]} contains multiple distinct " f"chromosomes: {chromosomes}." ) @@ -348,7 +351,7 @@ def _set_chromsomes_locations(self, row: List[str], params: Dict) -> Dict: # Exclude genes where there are multiple distinct locations # i.e. OMS: '10q26.3', '19q13.42-q13.43', '3p25.3' if len(locations) > 2: - logger.info( + _logger.info( f"{row[2]} contains multiple distinct " f"locations: {locations}." ) locations = None @@ -359,7 +362,7 @@ def _set_chromsomes_locations(self, row: List[str], params: Dict) -> Dict: for i in range(len(locations)): loc = locations[i].strip() if not re.match("^([1-9][0-9]?|X[pq]?|Y[pq]?)", loc): - logger.info( + _logger.info( f"{row[2]} contains invalid map location:" f"{loc}." ) params["location_annotations"].append(loc) @@ -447,7 +450,7 @@ def _set_centromere_location(self, loc: str, location: Dict) -> None: def _transform_data(self) -> None: """Modify data and pass to loading functions.""" - logger.info("Transforming NCBI...") + _logger.info("Transforming NCBI data...") prev_symbols = self._get_prev_symbols() info_genes = self._get_gene_info(prev_symbols) @@ -464,7 +467,7 @@ def _transform_data(self) -> None: for gene in info_genes.keys(): self._load_gene(info_genes[gene]) - logger.info("Successfully transformed NCBI.") + _logger.info("NCBI data transform complete.") def _add_meta(self) -> None: """Add Ensembl metadata. diff --git a/src/gene/etl/update.py b/src/gene/etl/update.py index 489ccd70..ed8dac8e 100644 --- a/src/gene/etl/update.py +++ b/src/gene/etl/update.py @@ -84,7 +84,7 @@ def load_source( click.get_current_context().exit() end_load = timer() load_time = end_load - start_load - msg = f"Loaded {source.value} in {load_time:.5f} seconds." + msg = f"Loaded {len(processed_ids)} records from {source.value} in {load_time:.5f} seconds." if not silent: click.echo(msg) _logger.info(msg)