Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bring in the latest Develop #10

Merged
merged 11 commits into from
Feb 7, 2024
7 changes: 7 additions & 0 deletions activitysim/core/configuration/top.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from pathlib import Path
from typing import Any, Literal

from pydantic import validator

from activitysim.core.configuration.base import PydanticBase, Union


Expand Down Expand Up @@ -119,6 +121,11 @@ class OutputTables(PydanticBase):
h5_store: bool = False
"""Write tables into a single HDF5 store instead of individual CSVs."""

file_type: Literal["csv", "parquet", "h5"] = "csv"
"""
Specifies the file type for output tables. Options are limited to 'csv',
'h5' or 'parquet'. Only applied if h5_store is set to False."""

action: str
"""Whether to 'include' or 'skip' the enumerated tables in `tables`."""

Expand Down
33 changes: 28 additions & 5 deletions activitysim/core/steps/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import pandas as pd
import pyarrow as pa
import pyarrow.csv as csv
import pyarrow.parquet as parquet

from activitysim.core import configuration, workflow
from activitysim.core.workflow.checkpoint import CHECKPOINT_NAME
Expand Down Expand Up @@ -226,8 +227,13 @@ def write_data_dictionary(state: workflow.State) -> None:
@workflow.step
def write_tables(state: workflow.State) -> None:
"""
Write pipeline tables as csv files (in output directory) as specified by output_tables list
in settings file.
Write pipeline tables as csv or parquet files (in output directory) as specified
by output_tables list in settings file. Output to parquet or a single h5 file is
also supported.

'h5_store' defaults to False, which means the output will be written out to csv.
'file_type' defaults to 'csv' but can also be used to specify 'parquet' or 'h5'.
When 'h5_store' is set to True, 'file_type' is ingored and the outputs are written to h5.

'output_tables' can specify either a list of output tables to include or to skip
if no output_tables list is specified, then all checkpointed tables will be written
Expand Down Expand Up @@ -261,6 +267,16 @@ def write_tables(state: workflow.State) -> None:
tables:
- households

To write tables to parquet files, use the file_type setting:

::

output_tables:
file_type: parquet
action: include
tables:
- households

Parameters
----------
output_dir: str
Expand All @@ -277,6 +293,7 @@ def write_tables(state: workflow.State) -> None:
tables = output_tables_settings.tables
prefix = output_tables_settings.prefix
h5_store = output_tables_settings.h5_store
file_type = output_tables_settings.file_type
sort = output_tables_settings.sort

registered_tables = state.registered_tables()
Expand Down Expand Up @@ -388,14 +405,20 @@ def map_func(x):
):
dt = dt.drop([f"_original_{lookup_col}"])

if h5_store:
if h5_store or file_type == "h5":
file_path = state.get_output_file_path("%soutput_tables.h5" % prefix)
dt.to_pandas().to_hdf(
str(file_path), key=table_name, mode="a", format="fixed"
)

else:
file_name = f"{prefix}{table_name}.csv"
file_name = f"{prefix}{table_name}.{file_type}"
file_path = state.get_output_file_path(file_name)

# include the index if it has a name or is a MultiIndex
csv.write_csv(dt, file_path)
if file_type == "csv":
csv.write_csv(dt, file_path)
elif file_type == "parquet":
parquet.write_table(dt, file_path)
else:
raise ValueError(f"unknown file_type {file_type}")
Loading