Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enumerate unzipped files #10842

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 24 additions & 7 deletions geonode/storage/data_retriever.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,17 +205,34 @@ def _unzip(self, zip_name: str) -> Mapping:
at the end the zip is deleted
"""
zip_file = self.file_paths["base_file"]
the_zip = zipfile.ZipFile(zip_file, allowZip64=True)
the_zip.extractall(self.temporary_folder)
with zipfile.ZipFile(zip_file, allowZip64=True) as the_zip:
the_zip.extractall(self.temporary_folder)

available_choices = get_allowed_extensions()
not_main_files = ["xml", "sld", "zip", "kmz"]
base_file_choices = [x for x in available_choices if x not in not_main_files]
for _file in Path(self.temporary_folder).iterdir():
if any([_file.name.endswith(_ext) for _ext in base_file_choices]):
self.file_paths["base_file"] = Path(str(_file))
elif not zipfile.is_zipfile(str(_file)):
sorted_files = sorted(Path(self.temporary_folder).iterdir())
for _file in sorted_files:
if not zipfile.is_zipfile(str(_file)):
if any([_file.name.endswith(_ext) for _ext in base_file_choices]):
self.file_paths["base_file"] = Path(str(_file))
ext = _file.name.split(".")[-1]
self.file_paths[f"{ext}_file"] = Path(str(_file))
if f"{ext}_file" in self.file_paths:
existing = self.file_paths[f"{ext}_file"]
self.file_paths[f"{ext}_file"] = [
Path(str(_file)),
*(existing if isinstance(existing, list) else [existing]),
]
else:
self.file_paths[f"{ext}_file"] = Path(str(_file))

tmp = self.file_paths.copy()
for key, value in self.file_paths.items():
if isinstance(value, list):
for index, file_path in enumerate(value):
n = f"{key}_{index}" if index > 0 else key
tmp[n] = file_path
self.file_paths = tmp

# remiving the zip file
os.remove(zip_name)
Expand Down
16 changes: 15 additions & 1 deletion geonode/storage/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,21 @@ def test_zip_file_should_correctly_recognize_main_extension_with_csv(self):

self.assertIsNotNone(storage_manager.data_retriever.temporary_folder)
_files = storage_manager.get_retrieved_paths()
self.assertTrue("example.csv" in _files.get("base_file"))
# Selected base_file is not defined in case of multiple csv files
self.assertTrue(_files.get("base_file").endswith(".csv"))

def test_zip_file_should_correctly_index_file_extensions(self):
# reinitiate the storage manager with the zip file
storage_manager = self.sut(
remote_files={"base_file": os.path.join(f"{self.project_root}", "tests/data/example.zip")}
)
storage_manager.clone_remote_files()

self.assertIsNotNone(storage_manager.data_retriever.temporary_folder)
_files = storage_manager.get_retrieved_paths()
self.assertIsNotNone(_files.get("csv_file"))
# extensions found more than once get indexed
self.assertIsNotNone(_files.get("csv_file_1"))

@override_settings(
SUPPORTED_DATASET_FILE_TYPES=[
Expand Down
Binary file modified geonode/storage/tests/data/example.zip
Binary file not shown.