From d9e766e3b93f336506c2f1512573f8ebc80c0da4 Mon Sep 17 00:00:00 2001 From: Matt Fisher Date: Wed, 17 Jul 2024 15:03:00 -0600 Subject: [PATCH 1/4] WIP add logging, progress bar NOTE: Progress bar finishes early! What happens after that? --- pyproject.toml | 1 + src/aross_stations_db/source_data.py | 17 +++++++++++++---- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 83877b3..19eacbe 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,6 +23,7 @@ classifiers = [ dynamic = ["version"] dependencies = [ "loguru", + "tqdm", "fastapi ~=0.111.0", "pydantic ~=2.0", "pydantic-settings", diff --git a/src/aross_stations_db/source_data.py b/src/aross_stations_db/source_data.py index 8b0cc1f..64cf35f 100644 --- a/src/aross_stations_db/source_data.py +++ b/src/aross_stations_db/source_data.py @@ -3,18 +3,27 @@ from collections.abc import Iterator from pathlib import Path +from loguru import logger +from tqdm import tqdm + def get_stations(metadata_fp: Path) -> list[dict[str, str]]: stations_metadata_str = metadata_fp.read_text() - return list(csv.DictReader(io.StringIO(stations_metadata_str))) + stations_metadata = list(csv.DictReader(io.StringIO(stations_metadata_str))) + + logger.info(f"Found {len(stations_metadata)} stations") + return stations_metadata + +def get_event_files(events_dir: Path) -> list[Path]: + event_files = list(events_dir.glob("*.event.csv")) -def get_event_files(events_dir: Path) -> Iterator[Path]: - return events_dir.glob("*.event.csv") + logger.info(f"Found {len(event_files)} event files") + return event_files def get_events(events_dir: Path) -> Iterator[dict[str, str]]: - for event_fp in get_event_files(events_dir): + for event_fp in tqdm(get_event_files(events_dir)): station_id = event_fp.stem.split(".")[0] with event_fp.open() as event_file: From 5ff8b7038d3976303c46d540360ff05962560c31 Mon Sep 17 00:00:00 2001 From: Matt Fisher Date: Wed, 17 Jul 2024 17:53:47 -0600 Subject: [PATCH 2/4] Improve logging and use different bulk insert method --- src/aross_stations_db/cli.py | 21 ++++++++++--- src/aross_stations_db/db/setup.py | 46 ++++++++++++++++++---------- src/aross_stations_db/source_data.py | 3 +- 3 files changed, 48 insertions(+), 22 deletions(-) diff --git a/src/aross_stations_db/cli.py b/src/aross_stations_db/cli.py index dc0cace..669ad4a 100644 --- a/src/aross_stations_db/cli.py +++ b/src/aross_stations_db/cli.py @@ -1,9 +1,11 @@ import click from loguru import logger from sqlalchemy.orm import Session +from tqdm import tqdm from aross_stations_db.config import CliLoadSettings, Settings from aross_stations_db.db.setup import ( + generate_event_objects, load_events, load_stations, recreate_tables, @@ -39,11 +41,22 @@ def load() -> None: # See: https://github.com/pydantic/pydantic/issues/6713 config = CliLoadSettings() # type:ignore[call-arg] - stations = get_stations(config.stations_metadata_filepath) - events = get_events(config.events_dir) + raw_stations = get_stations(config.stations_metadata_filepath) + raw_events = get_events(config.events_dir) with Session(config.db_engine) as db_session: - load_stations(stations, session=db_session) + load_stations(raw_stations, session=db_session) + logger.info("Loaded stations") + + # The event processing steps are split into stages to provide better feadback at + # runtime. On slower systems, it can be unclear what the bottleneck is. In the + # long run, we should try to optimize this after learning more. + events = generate_event_objects( + tqdm(raw_events, desc="Reading events"), + ) + + logger.info("Loading events; this can take a minute or so") load_events(events, session=db_session) + logger.info("Loaded events") - logger.success("Data loaded") + logger.success("Database load complete") diff --git a/src/aross_stations_db/db/setup.py b/src/aross_stations_db/db/setup.py index 2382652..732e360 100644 --- a/src/aross_stations_db/db/setup.py +++ b/src/aross_stations_db/db/setup.py @@ -1,7 +1,7 @@ import datetime as dt from collections.abc import Iterator -from sqlalchemy import MetaData +from sqlalchemy import MetaData, insert from sqlalchemy.orm import Session from aross_stations_db.db.tables import Base, Event, Station @@ -65,22 +65,36 @@ def load_stations(stations: list[dict[str, str]], *, session: Session) -> None: session.commit() -def load_events(events: Iterator[dict[str, str]], *, session: Session) -> None: - session.add_all( - [ - Event( - station_id=event["station_id"], - time_start=dt.datetime.fromisoformat(event["start"]), - time_end=dt.datetime.fromisoformat(event["end"]), - snow_on_ground=_snow_on_ground_status(event["sog"]), - rain_hours=int(event["RA"]), - freezing_rain_hours=int(event["FZRA"]), - solid_precipitation_hours=int(event["SOLID"]), - unknown_precipitation_hours=int(event["UP"]), - ) - for event in events - ] +def generate_event_objects(raw_events: Iterator[dict[str, str]]) -> list[Event]: + return [ + Event( + station_id=event["station_id"], + time_start=dt.datetime.fromisoformat(event["start"]), + time_end=dt.datetime.fromisoformat(event["end"]), + snow_on_ground=_snow_on_ground_status(event["sog"]), + rain_hours=int(event["RA"]), + freezing_rain_hours=int(event["FZRA"]), + solid_precipitation_hours=int(event["SOLID"]), + unknown_precipitation_hours=int(event["UP"]), + ) + for event in raw_events + ] + + +def load_events(events: list[Event], *, session: Session) -> None: + """Load events into the database. + + Trying to follow the bulk load instructions, but it's hard to tell why this step + takes as long as it does. When using tqdm to monitor progress, things "stall" for + some time after the iterable is consumed. I expected this would not happen because + of under-the-hood batching, so I'm not really sure how to make this more performant, + or if we can. + """ + session.execute( + insert(Event), + [event.__dict__ for event in events], ) + session.commit() diff --git a/src/aross_stations_db/source_data.py b/src/aross_stations_db/source_data.py index 64cf35f..a3429ae 100644 --- a/src/aross_stations_db/source_data.py +++ b/src/aross_stations_db/source_data.py @@ -4,7 +4,6 @@ from pathlib import Path from loguru import logger -from tqdm import tqdm def get_stations(metadata_fp: Path) -> list[dict[str, str]]: @@ -23,7 +22,7 @@ def get_event_files(events_dir: Path) -> list[Path]: def get_events(events_dir: Path) -> Iterator[dict[str, str]]: - for event_fp in tqdm(get_event_files(events_dir)): + for event_fp in get_event_files(events_dir): station_id = event_fp.stem.split(".")[0] with event_fp.open() as event_file: From f8773f2ed13aaa905502139501833a245a41b4c9 Mon Sep 17 00:00:00 2001 From: Matt Fisher Date: Wed, 17 Jul 2024 18:04:01 -0600 Subject: [PATCH 3/4] Combine CLI commands into one --- README.md | 3 +-- src/aross_stations_db/cli.py | 25 +++++++++++++------------ 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index 53e16ac..f23d41f 100644 --- a/README.md +++ b/README.md @@ -153,8 +153,7 @@ browser to `http://localhost:8080` and enter: ### Run ingest ```bash -docker compose run cli init # Create empty tables (deleting any pre-existing ones) -docker compose run cli load # Load the tables from event files +docker compose run cli init ``` From a fast disk, this should take under 2 minutes. diff --git a/src/aross_stations_db/cli.py b/src/aross_stations_db/cli.py index 669ad4a..d54abed 100644 --- a/src/aross_stations_db/cli.py +++ b/src/aross_stations_db/cli.py @@ -3,7 +3,7 @@ from sqlalchemy.orm import Session from tqdm import tqdm -from aross_stations_db.config import CliLoadSettings, Settings +from aross_stations_db.config import CliLoadSettings from aross_stations_db.db.setup import ( generate_event_objects, load_events, @@ -21,25 +21,26 @@ def cli() -> None: pass +@click.option( + "--skip-load", + help="Skip loading data; only initialize tables.", + is_flag=True, +) @cli.command -def init() -> None: - """Create the database tables, dropping any that pre-exist.""" +def init(skip_load: bool = False) -> None: + """Load the database from files on disk.""" # TODO: False-positive. Remove type-ignore. # See: https://github.com/pydantic/pydantic/issues/6713 - config = Settings() # type:ignore[call-arg] + config = CliLoadSettings() # type:ignore[call-arg] with Session(config.db_engine) as db_session: recreate_tables(db_session) - logger.success("Database initialized") + logger.info("Database tables initialized") - -@cli.command -def load() -> None: - """Load the database tables from files on disk.""" - # TODO: False-positive. Remove type-ignore. - # See: https://github.com/pydantic/pydantic/issues/6713 - config = CliLoadSettings() # type:ignore[call-arg] + if skip_load: + logger.warning("Skipping data load.") + return raw_stations = get_stations(config.stations_metadata_filepath) raw_events = get_events(config.events_dir) From 06a98c8ca680ae901c295c68c9feb84727ed3136 Mon Sep 17 00:00:00 2001 From: Matt Fisher Date: Wed, 17 Jul 2024 18:21:29 -0600 Subject: [PATCH 4/4] Refactor progress bar to cli level --- pyproject.toml | 1 + src/aross_stations_db/cli.py | 9 +++++---- src/aross_stations_db/db/setup.py | 26 +++++++++++--------------- 3 files changed, 17 insertions(+), 19 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 19eacbe..83b0029 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,6 +42,7 @@ test = [ "pytest >=6", "pytest-cov >=3", "mypy >=1.10", + "types-tqdm", ] dev = [ "pytest >=6", diff --git a/src/aross_stations_db/cli.py b/src/aross_stations_db/cli.py index d54abed..5e0ffdb 100644 --- a/src/aross_stations_db/cli.py +++ b/src/aross_stations_db/cli.py @@ -5,7 +5,7 @@ from aross_stations_db.config import CliLoadSettings from aross_stations_db.db.setup import ( - generate_event_objects, + generate_event_object, load_events, load_stations, recreate_tables, @@ -52,10 +52,11 @@ def init(skip_load: bool = False) -> None: # The event processing steps are split into stages to provide better feadback at # runtime. On slower systems, it can be unclear what the bottleneck is. In the # long run, we should try to optimize this after learning more. - events = generate_event_objects( - tqdm(raw_events, desc="Reading events"), - ) + events = [ + generate_event_object(e) for e in tqdm(raw_events, desc="Reading events") + ] + # TODO: Is there any way we can monitor this process with a progress bar? logger.info("Loading events; this can take a minute or so") load_events(events, session=db_session) logger.info("Loaded events") diff --git a/src/aross_stations_db/db/setup.py b/src/aross_stations_db/db/setup.py index 732e360..5bb91ab 100644 --- a/src/aross_stations_db/db/setup.py +++ b/src/aross_stations_db/db/setup.py @@ -1,5 +1,4 @@ import datetime as dt -from collections.abc import Iterator from sqlalchemy import MetaData, insert from sqlalchemy.orm import Session @@ -65,20 +64,17 @@ def load_stations(stations: list[dict[str, str]], *, session: Session) -> None: session.commit() -def generate_event_objects(raw_events: Iterator[dict[str, str]]) -> list[Event]: - return [ - Event( - station_id=event["station_id"], - time_start=dt.datetime.fromisoformat(event["start"]), - time_end=dt.datetime.fromisoformat(event["end"]), - snow_on_ground=_snow_on_ground_status(event["sog"]), - rain_hours=int(event["RA"]), - freezing_rain_hours=int(event["FZRA"]), - solid_precipitation_hours=int(event["SOLID"]), - unknown_precipitation_hours=int(event["UP"]), - ) - for event in raw_events - ] +def generate_event_object(raw_event: dict[str, str]) -> Event: + return Event( + station_id=raw_event["station_id"], + time_start=dt.datetime.fromisoformat(raw_event["start"]), + time_end=dt.datetime.fromisoformat(raw_event["end"]), + snow_on_ground=_snow_on_ground_status(raw_event["sog"]), + rain_hours=int(raw_event["RA"]), + freezing_rain_hours=int(raw_event["FZRA"]), + solid_precipitation_hours=int(raw_event["SOLID"]), + unknown_precipitation_hours=int(raw_event["UP"]), + ) def load_events(events: list[Event], *, session: Session) -> None: