Skip to content

Commit

Permalink
Add parquet_file option in set_table_in_survey
Browse files Browse the repository at this point in the history
  • Loading branch information
clallemand committed Jan 2, 2024
1 parent ab85e4f commit 3acd43b
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 14 deletions.
30 changes: 22 additions & 8 deletions openfisca_survey_manager/input_dataframe_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ def randomly_init_variable(tax_benefit_system, input_dataframe_by_entity, variab


def set_table_in_survey(input_dataframe, entity, period, collection, survey_name, survey_label = None,
table_label = None, table_name = None, config_files_directory = default_config_files_directory):
table_label = None, table_name = None, config_files_directory = default_config_files_directory,
source_format = None, parquet_file = None):
period = periods.period(period)
if table_name is None:
table_name = entity + '_' + str(period)
Expand All @@ -186,7 +187,7 @@ def set_table_in_survey(input_dataframe, entity, period, collection, survey_name
survey_collection = SurveyCollection.load(collection = collection, config_files_directory=config_files_directory)
except configparser.NoOptionError as e:
log.warning(f"set_table_in_survey configparser.NoOptionError : {e}")
survey_collection = SurveyCollection(name = collection)
survey_collection = SurveyCollection(name = collection, config_files_directory=config_files_directory)
except configparser.NoSectionError as e: # For tests
log.warning(f"set_table_in_survey configparser.NoSectionError : {e}")
data_dir = os.path.join(
Expand All @@ -199,6 +200,9 @@ def set_table_in_survey(input_dataframe, entity, period, collection, survey_name
name = collection,
config_files_directory = data_dir,
)
except FileNotFoundError as e:
log.warning(f"set_table_in_survey FileNotFoundError : {e}")
survey_collection = SurveyCollection(name = collection, config_files_directory=config_files_directory)

try:
survey = survey_collection.get_survey(survey_name)
Expand All @@ -210,17 +214,27 @@ def set_table_in_survey(input_dataframe, entity, period, collection, survey_name
survey_collection = survey_collection,
)

if survey.hdf5_file_path is None:
if survey.hdf5_file_path is None and survey.parquet_file_path is None:
config = survey.survey_collection.config
directory_path = config.get("data", "output_directory")
if not os.path.isdir(directory_path):
log.warning("{} who should be the HDF5 data directory does not exist: we create the directory".format(
log.warning("{} who should be the data directory does not exist: we create the directory".format(
directory_path))
os.makedirs(directory_path)
survey.hdf5_file_path = os.path.join(directory_path, survey.name + '.h5')

assert survey.hdf5_file_path is not None
survey.insert_table(label = table_label, name = table_name, dataframe = input_dataframe)
if source_format is None:
survey.hdf5_file_path = os.path.join(directory_path, survey.name + '.h5')
elif source_format == "parquet":
survey.parquet_file_path = os.path.join(directory_path, survey.name)
if not os.path.isdir(survey.parquet_file_path):
log.warning("{} who should be the parquet data directory does not exist: we create the directory".format(
survey.parquet_file_path))
os.makedirs(survey.parquet_file_path)


assert (survey.hdf5_file_path is not None) or (survey.parquet_file_path is not None)
if source_format == "parquet" and parquet_file is None:
parquet_file = os.path.join(directory_path, survey.name + '.parquet')
survey.insert_table(label = table_label, name = table_name, dataframe = input_dataframe, parquet_file = parquet_file)
# If a survey with save name exist it will be overwritten
survey_collection.surveys = [
kept_survey for kept_survey in survey_collection.surveys if kept_survey.name != survey_name
Expand Down
16 changes: 10 additions & 6 deletions openfisca_survey_manager/surveys.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ def insert_table(self, label = None, name = None, **kwargs):
Inserts a table in the Survey object
If a pandas dataframe is provided, it is saved in the hdf5 file
"""

parquet_file = kwargs.pop('parquet_file', None)
data_frame = kwargs.pop('data_frame', None)
if data_frame is None:
# Try without underscore
Expand All @@ -357,11 +357,15 @@ def insert_table(self, label = None, name = None, **kwargs):
variables = list(data_frame.columns)
if label is None:
label = name
table = Table(label = label, name = name, survey = self, variables = variables)
assert table.survey.hdf5_file_path is not None
log.debug("Saving table {} in {}".format(name, table.survey.hdf5_file_path))
to_hdf_kwargs = kwargs.pop('to_hdf_kwargs', dict())
table.save_data_frame(data_frame, **to_hdf_kwargs)
table = Table(label = label, name = name, survey = self, variables = variables, parquet_file = parquet_file)
assert (table.survey.hdf5_file_path is not None) or (table.survey.parquet_file_path is not None)
if parquet_file is not None:
log.debug("Saving table {} in {}".format(name, table.survey.parquet_file_path))
data_frame.to_parquet(parquet_file)
else:
log.debug("Saving table {} in {}".format(name, table.survey.hdf5_file_path))
to_hdf_kwargs = kwargs.pop('to_hdf_kwargs', dict())
table.save_data_frame(data_frame, **to_hdf_kwargs)

if name not in self.tables:
self.tables[name] = dict()
Expand Down

0 comments on commit 3acd43b

Please sign in to comment.