diff --git a/importer/api/serializer.py b/importer/api/serializer.py index 0e254b3a..f201d79a 100644 --- a/importer/api/serializer.py +++ b/importer/api/serializer.py @@ -5,7 +5,6 @@ class ImporterSerializer(DynamicModelSerializer): class Meta: - ref_name = "ImporterSerializer" model = Upload view_name = "importer_upload" diff --git a/importer/api/tests.py b/importer/api/tests.py index 1644b3c4..bbf46831 100644 --- a/importer/api/tests.py +++ b/importer/api/tests.py @@ -14,7 +14,6 @@ class TestImporterViewSet(ImporterBaseTestSupport): - @classmethod def setUpClass(cls): super().setUpClass() @@ -45,7 +44,7 @@ def test_redirect_to_old_upload_if_file_is_not_a_gpkg(self, patch_upload): upload.upload.return_value = HttpResponse() patch_upload.return_value = upload - self.client.force_login(get_user_model().objects.get(username='admin')) + self.client.force_login(get_user_model().objects.get(username="admin")) payload = { "base_file": SimpleUploadedFile(name="file.invalid", content=b"abc"), } @@ -59,7 +58,7 @@ def test_gpkg_raise_error_with_invalid_payload(self, patch_upload): upload.upload.return_value = HttpResponse() patch_upload.return_value = upload - self.client.force_login(get_user_model().objects.get(username='admin')) + self.client.force_login(get_user_model().objects.get(username="admin")) payload = { "base_file": SimpleUploadedFile(name="test.gpkg", content=b"some-content"), "store_spatial_files": "invalid", @@ -79,7 +78,7 @@ def test_gpkg_raise_error_with_invalid_payload(self, patch_upload): def test_gpkg_task_is_called(self, patch_upload): patch_upload.apply_async.side_effect = MagicMock() - self.client.force_login(get_user_model().objects.get(username='admin')) + self.client.force_login(get_user_model().objects.get(username="admin")) payload = { "base_file": SimpleUploadedFile(name="test.gpkg", content=b"some-content"), "store_spatial_files": True, @@ -93,7 +92,7 @@ def test_gpkg_task_is_called(self, patch_upload): def test_geojson_task_is_called(self, patch_upload): patch_upload.apply_async.side_effect = MagicMock() - self.client.force_login(get_user_model().objects.get(username='admin')) + self.client.force_login(get_user_model().objects.get(username="admin")) payload = { "base_file": SimpleUploadedFile( name="test.geojson", content=b"some-content" @@ -111,7 +110,7 @@ def test_geojson_task_is_called(self, patch_upload): def test_zip_file_is_unzip_and_the_handler_is_found(self, patch_upload): patch_upload.apply_async.side_effect = MagicMock() - self.client.force_login(get_user_model().objects.get(username='admin')) + self.client.force_login(get_user_model().objects.get(username="admin")) payload = { "base_file": open(f"{project_dir}/tests/fixture/valid.zip", "rb"), "zip_file": open(f"{project_dir}/tests/fixture/valid.zip", "rb"), @@ -123,8 +122,7 @@ def test_zip_file_is_unzip_and_the_handler_is_found(self, patch_upload): self.assertEqual(201, response.status_code) def test_copy_method_not_allowed(self): - - self.client.force_login(get_user_model().objects.get(username='admin')) + self.client.force_login(get_user_model().objects.get(username="admin")) response = self.client.get(self.copy_url) self.assertEqual(405, response.status_code) @@ -139,7 +137,7 @@ def test_copy_method_not_allowed(self): @patch("importer.api.views.ResourceBaseViewSet.resource_service_copy") def test_redirect_to_old_upload_if_file_handler_is_not_set(self, copy_view, _orc): copy_view.return_value = HttpResponse() - self.client.force_login(get_user_model().objects.get(username='admin')) + self.client.force_login(get_user_model().objects.get(username="admin")) response = self.client.put(self.copy_url) @@ -149,10 +147,10 @@ def test_redirect_to_old_upload_if_file_handler_is_not_set(self, copy_view, _orc @patch("importer.api.views.import_orchestrator") def test_copy_ther_resource_if_file_handler_is_set(self, _orc): - user = get_user_model().objects.get(username='admin') + user = get_user_model().objects.get(username="admin") user.is_superuser = True user.save() - self.client.force_login(get_user_model().objects.get(username='admin')) + self.client.force_login(get_user_model().objects.get(username="admin")) ResourceHandlerInfo.objects.create( resource=self.dataset, handler_module_path="importer.handlers.gpkg.handler.GPKGFileHandler", diff --git a/importer/api/views.py b/importer/api/views.py index 7acb4fda..3cd9bb99 100644 --- a/importer/api/views.py +++ b/importer/api/views.py @@ -83,7 +83,6 @@ def get_serializer_class(self): return specific_serializer or ImporterSerializer def create(self, request, *args, **kwargs): - """ Main function called by the new import flow. It received the file via the front end @@ -120,7 +119,6 @@ def create(self, request, *args, **kwargs): handler = orchestrator.get_handler(_data) if _file and handler: - try: # cloning data into a local folder extracted_params, _data = handler.extract_params_from_data(_data) @@ -151,7 +149,7 @@ def create(self, request, *args, **kwargs): legacy_upload_name=_file.name, action=action, name=_file.name, - source='upload' + source="upload", ) sig = import_orchestrator.s( @@ -175,7 +173,6 @@ def create(self, request, *args, **kwargs): class ResourceImporter(DynamicModelViewSet): - authentication_classes = [ SessionAuthentication, BasicAuthentication, @@ -212,7 +209,6 @@ class ResourceImporter(DynamicModelViewSet): def copy(self, request, *args, **kwargs): resource = self.get_object() if resource.resourcehandlerinfo_set.exists(): - handler_module_path = ( resource.resourcehandlerinfo_set.first().handler_module_path ) @@ -241,7 +237,7 @@ def copy(self, request, *args, **kwargs): **{"handler_module_path": handler_module_path}, **extracted_params, }, - source="importer_copy" + source="importer_copy", ) sig = import_orchestrator.s( diff --git a/importer/celery_tasks.py b/importer/celery_tasks.py index 855153ba..8ffc71db 100644 --- a/importer/celery_tasks.py +++ b/importer/celery_tasks.py @@ -21,7 +21,12 @@ from importer.celery_app import importer_app from importer.datastore import DataStoreManager from importer.handlers.gpkg.tasks import SingleMessageErrorHandler -from importer.handlers.utils import create_alternate, drop_dynamic_model_schema, evaluate_error, get_uuid +from importer.handlers.utils import ( + create_alternate, + drop_dynamic_model_schema, + evaluate_error, + get_uuid, +) from importer.orchestrator import orchestrator from importer.publisher import DataPublisher from importer.settings import ( @@ -39,6 +44,7 @@ class ErrorBaseTaskClass(Task): Basic Error task class. Is common to all the base tasks of the import pahse it defines a on_failure method which set the task as "failed" with some extra information """ + max_retries = 3 track_started = True @@ -69,7 +75,6 @@ def import_orchestrator( action=exa.IMPORT.value, **kwargs, ): - """ Base task. Is the task responsible to call the orchestrator and redirect the upload to the next step mainly is a wrapper for the Orchestrator object. @@ -102,7 +107,7 @@ def import_orchestrator( @importer_app.task( bind=True, - #base=ErrorBaseTaskClass, + # base=ErrorBaseTaskClass, name="importer.import_resource", queue="importer.import_resource", max_retries=1, @@ -162,7 +167,7 @@ def import_resource(self, execution_id, /, handler_module_path, action, **kwargs layer=None, alternate=None, error=e, - **kwargs + **kwargs, ) raise InvalidInputFileException(detail=error_handler(e, execution_id)) @@ -225,7 +230,9 @@ def publish_resource( ) if data: # we should not publish resource without a crs - if not _overwrite or (_overwrite and not _publisher.get_resource(alternate)): + if not _overwrite or ( + _overwrite and not _publisher.get_resource(alternate) + ): _publisher.publish_resources(data) else: _publisher.overwrite_resources(data) @@ -237,7 +244,9 @@ def publish_resource( celery_task_request=self.request, ) else: - logger.error(f"Layer: {alternate} raised: Only resources with a CRS provided can be published for execution_id: {execution_id}") + logger.error( + f"Layer: {alternate} raised: Only resources with a CRS provided can be published for execution_id: {execution_id}" + ) raise PublishResourceException( "Only resources with a CRS provided can be published" ) @@ -268,7 +277,7 @@ def publish_resource( layer=layer_name, alternate=alternate, error=e, - **kwargs + **kwargs, ) raise PublishResourceException(detail=error_handler(e, execution_id)) @@ -326,17 +335,27 @@ def create_geonode_resource( if _overwrite: resource = handler.overwrite_geonode_resource( - layer_name=layer_name, alternate=alternate, execution_id=execution_id, files=_files + layer_name=layer_name, + alternate=alternate, + execution_id=execution_id, + files=_files, ) else: resource = handler.create_geonode_resource( - layer_name=layer_name, alternate=alternate, execution_id=execution_id, files=_files + layer_name=layer_name, + alternate=alternate, + execution_id=execution_id, + files=_files, ) if _overwrite: - handler.overwrite_resourcehandlerinfo(handler_module_path, resource, _exec, **kwargs) + handler.overwrite_resourcehandlerinfo( + handler_module_path, resource, _exec, **kwargs + ) else: - handler.create_resourcehandlerinfo(handler_module_path, resource, _exec, **kwargs) + handler.create_resourcehandlerinfo( + handler_module_path, resource, _exec, **kwargs + ) # at the end recall the import_orchestrator for the next step import_orchestrator.apply_async( @@ -360,7 +379,7 @@ def create_geonode_resource( layer=layer_name, alternate=alternate, error=e, - **kwargs + **kwargs, ) raise ResourceCreationException(detail=error_handler(e)) @@ -417,10 +436,14 @@ def copy_geonode_resource( _exec=_exec, data_to_update=data_to_update, new_alternate=new_alternate, - **kwargs + **kwargs, ) - handler.create_resourcehandlerinfo(resource=new_resource, handler_module_path=handler_module_path, execution_id=_exec) + handler.create_resourcehandlerinfo( + resource=new_resource, + handler_module_path=handler_module_path, + execution_id=_exec, + ) assert f"{workspace}:{new_alternate}" == new_resource.alternate @@ -453,7 +476,7 @@ def copy_geonode_resource( layer=layer_name, alternate=alternate, error=e, - **kwargs + **kwargs, ) raise CopyResourceException(detail=e) return exec_id, new_alternate @@ -510,9 +533,9 @@ def _create_field(dynamic_model_schema, field, _kwargs): if field["class_name"].endswith("CharField"): _kwargs = {**_kwargs, **{"max_length": 255}} - if field.get('dim', None) is not None: + if field.get("dim", None) is not None: # setting the dimension for the gemetry. So that we can handle also 3d geometries - _kwargs = {**_kwargs, **{"dim": field.get('dim')}} + _kwargs = {**_kwargs, **{"dim": field.get("dim")}} # if is a new creation we generate the field model from scratch if not overwrite: @@ -584,7 +607,7 @@ def copy_dynamic_model( # Creating the dynamic schema object new_schema = dynamic_schema.first() new_schema.name = new_dataset_alternate - new_schema.db_table_name = new_dataset_alternate + new_schema.db_table_name = new_dataset_alternate new_schema.pk = None new_schema.save() # create the field_schema object @@ -622,7 +645,7 @@ def copy_dynamic_model( layer=layer_name, alternate=alternate, error=e, - **{**kwargs, **additional_kwargs} + **{**kwargs, **additional_kwargs}, ) raise CopyResourceException(detail=e) return exec_id, kwargs @@ -639,9 +662,8 @@ def copy_geonode_data_table( ): """ Once the base resource is copied, is time to copy also the dynamic model - """ + """ try: - orchestrator.update_execution_request_status( execution_id=exec_id, last_updated=timezone.now(), @@ -659,7 +681,9 @@ def copy_geonode_data_table( db_name = os.getenv("DEFAULT_BACKEND_DATASTORE", "datastore") if os.getenv("IMPORTER_ENABLE_DYN_MODELS", False): - schema_exists = ModelSchema.objects.filter(name=new_dataset_alternate).first() + schema_exists = ModelSchema.objects.filter( + name=new_dataset_alternate + ).first() if schema_exists: db_name = schema_exists.db_name @@ -691,7 +715,7 @@ def copy_geonode_data_table( layer=layer_name, alternate=alternate, error=e, - **kwargs + **kwargs, ) raise CopyResourceException(detail=e) return exec_id, kwargs @@ -710,7 +734,7 @@ def rollback(self, *args, **kwargs): The handler must implement the code to rollback each step that is declared """ - + exec_id = get_uuid(args) logger.info(f"Calling rollback for execution_id {exec_id} in progress") @@ -729,14 +753,11 @@ def rollback(self, *args, **kwargs): ) handler = import_string(handler_module_path)() - handler.rollback( - exec_id, - rollback_from_step, - action_to_rollback, - *args, - **kwargs + handler.rollback(exec_id, rollback_from_step, action_to_rollback, *args, **kwargs) + error = ( + find_key_recursively(kwargs, "error") + or "Some issue has occured, please check the logs" ) - error = find_key_recursively(kwargs, "error") or "Some issue has occured, please check the logs" orchestrator.set_as_failed(exec_id, reason=error) return exec_id, kwargs diff --git a/importer/datastore.py b/importer/datastore.py index 8362e1c4..7a629a81 100644 --- a/importer/datastore.py +++ b/importer/datastore.py @@ -36,5 +36,4 @@ def start_import(self, execution_id, **kwargs): """ call the resource handler object to perform the import phase """ - return self.handler().import_resource(self.files, execution_id, **kwargs) - \ No newline at end of file + return self.handler().import_resource(self.files, execution_id, **kwargs) diff --git a/importer/handlers/apps.py b/importer/handlers/apps.py index ae6711ff..45b6456c 100644 --- a/importer/handlers/apps.py +++ b/importer/handlers/apps.py @@ -19,11 +19,15 @@ def ready(self): def run_setup_hooks(*args, **kwargs): - if getattr(settings, 'IMPORTER_HANDLERS', []): - _handlers = [import_string(module_path) for module_path in settings.IMPORTER_HANDLERS] + if getattr(settings, "IMPORTER_HANDLERS", []): + _handlers = [ + import_string(module_path) for module_path in settings.IMPORTER_HANDLERS + ] for item in _handlers: item.register() - logger.info(f"The following handlers have been registered: {', '.join(settings.IMPORTER_HANDLERS)}") + logger.info( + f"The following handlers have been registered: {', '.join(settings.IMPORTER_HANDLERS)}" + ) _available_settings = [ import_string(module_path)().supported_file_extension_config @@ -71,7 +75,9 @@ def run_setup_hooks(*args, **kwargs): supported_type.extend(_available_settings) if not getattr(settings, "ADDITIONAL_DATASET_FILE_TYPES", None): setattr(settings, "ADDITIONAL_DATASET_FILE_TYPES", supported_type) - elif "gpkg" not in [x.get("id") for x in settings.ADDITIONAL_DATASET_FILE_TYPES]: + elif "gpkg" not in [ + x.get("id") for x in settings.ADDITIONAL_DATASET_FILE_TYPES + ]: settings.ADDITIONAL_DATASET_FILE_TYPES.extend(supported_type) setattr( settings, diff --git a/importer/handlers/base.py b/importer/handlers/base.py index 45ba6449..72d07726 100644 --- a/importer/handlers/base.py +++ b/importer/handlers/base.py @@ -75,10 +75,10 @@ def can_handle(_data) -> bool: @staticmethod def has_serializer(_data) -> bool: - ''' + """ This endpoint should return (if set) the custom serializer used in the API to validate the input resource - ''' + """ return None @staticmethod @@ -100,10 +100,16 @@ def extract_params_from_data(_data): return [] def fixup_name(self, name): - return name.lower().replace("-", "_")\ - .replace(" ", "_").replace(")", "")\ - .replace("(", "").replace(",", "")\ - .replace("&", "").replace(".", "") + return ( + name.lower() + .replace("-", "_") + .replace(" ", "_") + .replace(")", "") + .replace("(", "") + .replace(",", "") + .replace("&", "") + .replace(".", "") + ) def extract_resource_to_publish(self, files, layer_name, alternate, **kwargs): """ diff --git a/importer/handlers/common/raster.py b/importer/handlers/common/raster.py index 8c6a22cf..b52a4e46 100644 --- a/importer/handlers/common/raster.py +++ b/importer/handlers/common/raster.py @@ -16,8 +16,7 @@ from geonode.resource.enumerator import ExecutionRequestAction as exa from geonode.resource.manager import resource_manager from geonode.resource.models import ExecutionRequest -from geonode.services.serviceprocessors.base import \ - get_geoserver_cascading_workspace +from geonode.services.serviceprocessors.base import get_geoserver_cascading_workspace from importer.api.exception import ImportException from importer.celery_tasks import ErrorBaseTaskClass, import_orchestrator from importer.handlers.base import BaseHandler @@ -69,10 +68,10 @@ def can_handle(_data) -> bool: @staticmethod def has_serializer(_data) -> bool: - ''' + """ This endpoint will return True or False if with the info provided the handler is able to handle the file or not - ''' + """ return False @staticmethod @@ -121,7 +120,7 @@ def publish_resources(resources: List[str], catalog, store, workspace): layer_name=_resource.get("name"), workspace=workspace, overwrite=True, - upload_data=False + upload_data=False, ) except Exception as e: if ( @@ -141,12 +140,12 @@ def delete_resource(instance): @staticmethod def perform_last_step(execution_id): - ''' + """ Override this method if there is some extra step to perform before considering the execution as completed. For example can be used to trigger an email-send to notify that the execution is completed - ''' + """ # as last step, we delete the celery task to keep the number of rows under control lower_exec_id = execution_id.replace("-", "_").lower() TaskResult.objects.filter( @@ -162,34 +161,46 @@ def perform_last_step(execution_id): _exec.save() - _exec.output_params.update(**{ - "detail_url": [ - x.resource.detail_url - for x in ResourceHandlerInfo.objects.filter(execution_request=_exec) - ] - }) - + _exec.output_params.update( + **{ + "detail_url": [ + x.resource.detail_url + for x in ResourceHandlerInfo.objects.filter(execution_request=_exec) + ] + } + ) - def extract_resource_to_publish(self, files, action, layer_name, alternate, **kwargs): + def extract_resource_to_publish( + self, files, action, layer_name, alternate, **kwargs + ): if action == exa.COPY.value: return [ { "name": alternate, - "crs": ResourceBase.objects.filter(Q(alternate__icontains=layer_name) | Q(title__icontains=layer_name)) + "crs": ResourceBase.objects.filter( + Q(alternate__icontains=layer_name) + | Q(title__icontains=layer_name) + ) .first() .srid, - "raster_path": kwargs['kwargs'].get("new_file_location").get("files")[0] + "raster_path": kwargs["kwargs"] + .get("new_file_location") + .get("files")[0], } ] layers = gdal.Open(files.get("base_file")) if not layers: return [] - return [{ + return [ + { "name": alternate or layer_name, - "crs": self.identify_authority(layers) if layers.GetSpatialRef() else None, - "raster_path": files.get("base_file") - }] + "crs": self.identify_authority(layers) + if layers.GetSpatialRef() + else None, + "raster_path": files.get("base_file"), + } + ] def identify_authority(self, layer): try: @@ -200,12 +211,20 @@ def identify_authority(self, layer): layer_proj4 = layer.GetSpatialRef().ExportToProj4() _code = pyproj.CRS(layer_proj4).to_epsg(min_confidence=20) if _code is None: - raise Exception("CRS authority code not found, fallback to default behaviour") + raise Exception( + "CRS authority code not found, fallback to default behaviour" + ) except: spatial_ref = layer.GetSpatialRef() spatial_ref.AutoIdentifyEPSG() - _name = spatial_ref.GetAuthorityName(None) or spatial_ref.GetAttrValue('AUTHORITY', 0) - _code = spatial_ref.GetAuthorityCode('PROJCS') or spatial_ref.GetAuthorityCode('GEOGCS') or spatial_ref.GetAttrValue('AUTHORITY', 1) + _name = spatial_ref.GetAuthorityName(None) or spatial_ref.GetAttrValue( + "AUTHORITY", 0 + ) + _code = ( + spatial_ref.GetAuthorityCode("PROJCS") + or spatial_ref.GetAuthorityCode("GEOGCS") + or spatial_ref.GetAttrValue("AUTHORITY", 1) + ) return f"{_name}:{_code}" def import_resource(self, files: dict, execution_id: str, **kwargs) -> str: @@ -218,7 +237,9 @@ def import_resource(self, files: dict, execution_id: str, **kwargs) -> str: logger.info("Total number of layers available: 1") _exec = self._get_execution_request_object(execution_id) _input = {**_exec.input_params, **{"total_layers": 1}} - orchestrator.update_execution_request_status(execution_id=str(execution_id), input_params=_input) + orchestrator.update_execution_request_status( + execution_id=str(execution_id), input_params=_input + ) try: filename = Path(files.get("base_file")).stem @@ -227,15 +248,11 @@ def import_resource(self, files: dict, execution_id: str, **kwargs) -> str: should_be_overwritten = _exec.input_params.get("overwrite_existing_layer") # should_be_imported check if the user+layername already exists or not - if ( - should_be_imported( - layer_name, - _exec.user, - skip_existing_layer=_exec.input_params.get( - "skip_existing_layer" - ), - overwrite_existing_layer=should_be_overwritten, - ) + if should_be_imported( + layer_name, + _exec.user, + skip_existing_layer=_exec.input_params.get("skip_existing_layer"), + overwrite_existing_layer=should_be_overwritten, ): workspace = get_geoserver_cascading_workspace(create=False) user_datasets = Dataset.objects.filter( @@ -268,7 +285,12 @@ def import_resource(self, files: dict, execution_id: str, **kwargs) -> str: return def create_geonode_resource( - self, layer_name: str, alternate: str, execution_id: str, resource_type: Dataset = Dataset, files=None + self, + layer_name: str, + alternate: str, + execution_id: str, + resource_type: Dataset = Dataset, + files=None, ): """ Base function to create the resource into geonode. Each handler can specify @@ -302,7 +324,12 @@ def create_geonode_resource( dirty_state=True, title=layer_name, owner=_exec.user, - files=list(set(list(_exec.input_params.get("files", {}).values()) or list(files))), + files=list( + set( + list(_exec.input_params.get("files", {}).values()) + or list(files) + ) + ), ), ) @@ -319,9 +346,13 @@ def create_geonode_resource( return saved_dataset def overwrite_geonode_resource( - self, layer_name: str, alternate: str, execution_id: str, resource_type: Dataset = Dataset, files=None + self, + layer_name: str, + alternate: str, + execution_id: str, + resource_type: Dataset = Dataset, + files=None, ): - dataset = resource_type.objects.filter(alternate__icontains=alternate) _exec = self._get_execution_request_object(execution_id) @@ -337,14 +368,18 @@ def overwrite_geonode_resource( self.handle_xml_file(dataset, _exec) self.handle_sld_file(dataset, _exec) - resource_manager.set_thumbnail(self.object.uuid, instance=self.object, overwrite=False) + resource_manager.set_thumbnail( + self.object.uuid, instance=self.object, overwrite=False + ) dataset.refresh_from_db() return dataset elif not dataset.exists() and _overwrite: logger.warning( f"The dataset required {alternate} does not exists, but an overwrite is required, the resource will be created" ) - return self.create_geonode_resource(layer_name, alternate, execution_id, resource_type, files) + return self.create_geonode_resource( + layer_name, alternate, execution_id, resource_type, files + ) elif not dataset.exists() and not _overwrite: logger.warning( "The resource does not exists, please use 'create_geonode_resource' to create one" @@ -372,7 +407,13 @@ def handle_sld_file(self, saved_dataset: Dataset, _exec: ExecutionRequest): vals={"dirty_state": True}, ) - def create_resourcehandlerinfo(self, handler_module_path: str, resource: Dataset, execution_id: ExecutionRequest, **kwargs): + def create_resourcehandlerinfo( + self, + handler_module_path: str, + resource: Dataset, + execution_id: ExecutionRequest, + **kwargs, + ): """ Create relation between the GeonodeResource and the handler used to create/copy it @@ -381,10 +422,16 @@ def create_resourcehandlerinfo(self, handler_module_path: str, resource: Dataset handler_module_path=str(handler_module_path), resource=resource, execution_request=execution_id, - kwargs=kwargs.get('kwargs', {}) + kwargs=kwargs.get("kwargs", {}), ) - def overwrite_resourcehandlerinfo(self, handler_module_path: str, resource: Dataset, execution_id: ExecutionRequest, **kwargs): + def overwrite_resourcehandlerinfo( + self, + handler_module_path: str, + resource: Dataset, + execution_id: ExecutionRequest, + **kwargs, + ): """ Overwrite the ResourceHandlerInfo """ @@ -393,19 +440,29 @@ def overwrite_resourcehandlerinfo(self, handler_module_path: str, resource: Data handler_module_path=handler_module_path, resource=resource, execution_request=execution_id, - kwargs=kwargs.get('kwargs', {}) or kwargs + kwargs=kwargs.get("kwargs", {}) or kwargs, ) return - return self.create_resourcehandlerinfo(handler_module_path, resource, execution_id, **kwargs) + return self.create_resourcehandlerinfo( + handler_module_path, resource, execution_id, **kwargs + ) def copy_geonode_resource( - self, alternate: str, resource: Dataset, _exec: ExecutionRequest, data_to_update: dict, new_alternate: str, **kwargs + self, + alternate: str, + resource: Dataset, + _exec: ExecutionRequest, + data_to_update: dict, + new_alternate: str, + **kwargs, ): resource = self.create_geonode_resource( layer_name=data_to_update.get("title"), alternate=new_alternate, execution_id=str(_exec.exec_id), - files=kwargs.get("kwargs", {}).get("new_file_location", {}).get("files", []) + files=kwargs.get("kwargs", {}) + .get("new_file_location", {}) + .get("files", []), ) resource.refresh_from_db() return resource @@ -415,28 +472,34 @@ def _get_execution_request_object(self, execution_id: str): @staticmethod def copy_original_file(dataset): - ''' + """ Copy the original file into a new location - ''' + """ return storage_manager.copy(dataset) - def rollback(self, exec_id, rollback_from_step, action_to_rollback, *args, **kwargs): + def rollback( + self, exec_id, rollback_from_step, action_to_rollback, *args, **kwargs + ): steps = self.ACTIONS.get(action_to_rollback) step_index = steps.index(rollback_from_step) # the start_import, start_copy etc.. dont do anything as step, is just the start # so there is nothing to rollback - steps_to_rollback = steps[1:step_index+1] + steps_to_rollback = steps[1 : step_index + 1] if not steps_to_rollback: return # reversing the tuple to going backwards with the rollback reversed_steps = steps_to_rollback[::-1] istance_name = None try: - istance_name = find_key_recursively(kwargs, "new_dataset_alternate") or args[3] + istance_name = ( + find_key_recursively(kwargs, "new_dataset_alternate") or args[3] + ) except: pass - - logger.warning(f"Starting rollback for execid: {exec_id} resource published was: {istance_name}") + + logger.warning( + f"Starting rollback for execid: {exec_id} resource published was: {istance_name}" + ) for step in reversed_steps: normalized_step_name = step.split(".")[-1] @@ -444,38 +507,48 @@ def rollback(self, exec_id, rollback_from_step, action_to_rollback, *args, **kwa function = getattr(self, f"_{normalized_step_name}_rollback") function(exec_id, istance_name, *args, **kwargs) - logger.warning(f"Rollback for execid: {exec_id} resource published was: {istance_name} completed") + logger.warning( + f"Rollback for execid: {exec_id} resource published was: {istance_name} completed" + ) def _import_resource_rollback(self, exec_id, istance_name=None, *args, **kwargs): - ''' + """ In the raster, this step just generate the alternate, no real action are done on the database - ''' + """ pass - def _publish_resource_rollback(self, exec_id, istance_name=None, *args, **kwargs): - ''' + def _publish_resource_rollback(self, exec_id, istance_name=None, *args, **kwargs): + """ We delete the resource from geoserver - ''' - logger.info(f"Rollback publishing step in progress for execid: {exec_id} resource published was: {istance_name}") + """ + logger.info( + f"Rollback publishing step in progress for execid: {exec_id} resource published was: {istance_name}" + ) exec_object = orchestrator.get_execution_object(exec_id) handler_module_path = exec_object.input_params.get("handler_module_path") publisher = DataPublisher(handler_module_path=handler_module_path) publisher.delete_resource(istance_name) - - def _create_geonode_resource_rollback(self, exec_id, istance_name=None, *args, **kwargs): - ''' + + def _create_geonode_resource_rollback( + self, exec_id, istance_name=None, *args, **kwargs + ): + """ The handler will remove the resource from geonode - ''' - logger.info(f"Rollback geonode step in progress for execid: {exec_id} resource created was: {istance_name}") + """ + logger.info( + f"Rollback geonode step in progress for execid: {exec_id} resource created was: {istance_name}" + ) resource = ResourceBase.objects.filter(alternate__icontains=istance_name) if resource.exists(): resource.delete() - + def _copy_dynamic_model_rollback(self, exec_id, istance_name=None, *args, **kwargs): self._import_resource_rollback(exec_id, istance_name=istance_name) - - def _copy_geonode_resource_rollback(self, exec_id, istance_name=None, *args, **kwargs): + + def _copy_geonode_resource_rollback( + self, exec_id, istance_name=None, *args, **kwargs + ): self._create_geonode_resource_rollback(exec_id, istance_name=istance_name) @@ -489,16 +562,10 @@ def _copy_geonode_resource_rollback(self, exec_id, istance_name=None, *args, **k task_track_started=True, ) def copy_raster_file( - exec_id, - actual_step, - layer_name, - alternate, - handler_module_path, - action, - **kwargs + exec_id, actual_step, layer_name, alternate, handler_module_path, action, **kwargs ): """ - Perform a copy of the original raster file """ + Perform a copy of the original raster file""" original_dataset = ResourceBase.objects.filter(alternate=alternate) if not original_dataset.exists(): @@ -507,16 +574,20 @@ def copy_raster_file( original_dataset = original_dataset.first() if not original_dataset.files: - raise InvalidGeoTiffException("The original file of the dataset is not available, Is not possible to copy the dataset") + raise InvalidGeoTiffException( + "The original file of the dataset is not available, Is not possible to copy the dataset" + ) - new_file_location = orchestrator.load_handler(handler_module_path).copy_original_file(original_dataset) + new_file_location = orchestrator.load_handler( + handler_module_path + ).copy_original_file(original_dataset) new_dataset_alternate = create_alternate(original_dataset.title, exec_id) additional_kwargs = { "original_dataset_alternate": original_dataset.alternate, "new_dataset_alternate": new_dataset_alternate, - "new_file_location": new_file_location + "new_file_location": new_file_location, } task_params = ( diff --git a/importer/handlers/common/tests_raster.py b/importer/handlers/common/tests_raster.py index 4d4b717c..ad2c197d 100644 --- a/importer/handlers/common/tests_raster.py +++ b/importer/handlers/common/tests_raster.py @@ -19,9 +19,7 @@ def setUpClass(cls): cls.user, _ = get_user_model().objects.get_or_create(username="admin") cls.valid_files = {"base_file": cls.valid_raster} cls.owner = get_user_model().objects.first() - cls.layer = create_single_dataset( - name="test_grid", owner=cls.owner - ) + cls.layer = create_single_dataset(name="test_grid", owner=cls.owner) def test_create_error_log(self): """ diff --git a/importer/handlers/common/tests_vector.py b/importer/handlers/common/tests_vector.py index cdc3baf3..954584aa 100644 --- a/importer/handlers/common/tests_vector.py +++ b/importer/handlers/common/tests_vector.py @@ -225,7 +225,10 @@ def test_import_with_ogr2ogr_without_errors_should_call_the_right_command( _open.assert_called_once() _open.assert_called_with( - f'/usr/bin/ogr2ogr --config PG_USE_COPY YES -f PostgreSQL PG:" dbname=\'geonode_data\' host=localhost port=5434 user=\'geonode\' password=\'geonode\' " "{self.valid_files.get("base_file")}" -lco DIM=2 -nln alternate "dataset"', stdout=-1, stderr=-1, shell=True # noqa + f'/usr/bin/ogr2ogr --config PG_USE_COPY YES -f PostgreSQL PG:" dbname=\'geonode_data\' host=localhost port=5434 user=\'geonode\' password=\'geonode\' " "{self.valid_files.get("base_file")}" -lco DIM=2 -nln alternate "dataset"', + stdout=-1, + stderr=-1, + shell=True, # noqa ) @patch("importer.handlers.common.vector.Popen") @@ -248,5 +251,8 @@ def test_import_with_ogr2ogr_with_errors_should_raise_exception(self, _open): _open.assert_called_once() _open.assert_called_with( - f'/usr/bin/ogr2ogr --config PG_USE_COPY YES -f PostgreSQL PG:" dbname=\'geonode_data\' host=localhost port=5434 user=\'geonode\' password=\'geonode\' " "{self.valid_files.get("base_file")}" -lco DIM=2 -nln alternate "dataset"', stdout=-1, stderr=-1, shell=True # noqa + f'/usr/bin/ogr2ogr --config PG_USE_COPY YES -f PostgreSQL PG:" dbname=\'geonode_data\' host=localhost port=5434 user=\'geonode\' password=\'geonode\' " "{self.valid_files.get("base_file")}" -lco DIM=2 -nln alternate "dataset"', + stdout=-1, + stderr=-1, + shell=True, # noqa ) diff --git a/importer/handlers/common/vector.py b/importer/handlers/common/vector.py index 0e72497d..85cc77b2 100644 --- a/importer/handlers/common/vector.py +++ b/importer/handlers/common/vector.py @@ -76,10 +76,10 @@ def can_handle(_data) -> bool: @staticmethod def has_serializer(_data) -> bool: - ''' + """ This endpoint will return True or False if with the info provided the handler is able to handle the file or not - ''' + """ return False @staticmethod @@ -157,7 +157,7 @@ def create_ogr2ogr_command(files, original_name, ovverwrite_layer, alternate): % (db_name, db_host, db_port, db_user, db_password) ) options += f'"{files.get("base_file")}"' + " " -# options += "-lco DIM=2 " + # options += "-lco DIM=2 " options += f'-nln {alternate} "{original_name}"' if ovverwrite_layer: @@ -176,11 +176,13 @@ def delete_resource(instance): if os.getenv("IMPORTER_ENABLE_DYN_MODELS", False): schema = ModelSchema.objects.filter(name=name).first() if schema: - ''' + """ We use the schema editor directly, because the model itself is not managed on creation, but for the delete since we are going to handle, we can use it - ''' - _model_editor = ModelSchemaEditor(initial_model=name, db_name=schema.db_name) + """ + _model_editor = ModelSchemaEditor( + initial_model=name, db_name=schema.db_name + ) _model_editor.drop_table(schema.as_model()) ModelSchema.objects.filter(name=name).delete() except Exception as e: @@ -188,12 +190,12 @@ def delete_resource(instance): @staticmethod def perform_last_step(execution_id): - ''' + """ Override this method if there is some extra step to perform before considering the execution as completed. For example can be used to trigger an email-send to notify that the execution is completed - ''' + """ # as last step, we delete the celery task to keep the number of rows under control lower_exec_id = execution_id.replace("-", "_").lower() TaskResult.objects.filter( @@ -207,12 +209,14 @@ def perform_last_step(execution_id): _exec = orchestrator.get_execution_object(execution_id) - _exec.output_params.update(**{ - "detail_url": [ - x.resource.detail_url - for x in ResourceHandlerInfo.objects.filter(execution_request=_exec) - ] - }) + _exec.output_params.update( + **{ + "detail_url": [ + x.resource.detail_url + for x in ResourceHandlerInfo.objects.filter(execution_request=_exec) + ] + } + ) _exec.save() if _exec and not _exec.input_params.get("store_spatial_file", False): resources = ResourceHandlerInfo.objects.filter(execution_request=_exec) @@ -221,12 +225,17 @@ def perform_last_step(execution_id): # better to delete each single file since it can be a remove storage service list(map(storage_manager.delete, resources_files)) - def extract_resource_to_publish(self, files, action, layer_name, alternate, **kwargs): + def extract_resource_to_publish( + self, files, action, layer_name, alternate, **kwargs + ): if action == exa.COPY.value: return [ { "name": alternate, - "crs": ResourceBase.objects.filter(Q(alternate__icontains=layer_name) | Q(title__icontains=layer_name)) + "crs": ResourceBase.objects.filter( + Q(alternate__icontains=layer_name) + | Q(title__icontains=layer_name) + ) .first() .srid, } @@ -238,7 +247,7 @@ def extract_resource_to_publish(self, files, action, layer_name, alternate, **kw return [ { "name": alternate or layer_name, - "crs": self.identify_authority(_l) if _l.GetSpatialRef() else None + "crs": self.identify_authority(_l) if _l.GetSpatialRef() else None, } for _l in layers if self.fixup_name(_l.GetName()) == layer_name @@ -253,12 +262,20 @@ def identify_authority(self, layer): layer_proj4 = layer.GetSpatialRef().ExportToProj4() _code = pyproj.CRS(layer_proj4).to_epsg(min_confidence=20) if _code is None: - raise Exception("CRS authority code not found, fallback to default behaviour") + raise Exception( + "CRS authority code not found, fallback to default behaviour" + ) except: spatial_ref = layer.GetSpatialRef() spatial_ref.AutoIdentifyEPSG() - _name = spatial_ref.GetAuthorityName(None) or spatial_ref.GetAttrValue('AUTHORITY', 0) - _code = spatial_ref.GetAuthorityCode('PROJCS') or spatial_ref.GetAuthorityCode('GEOGCS') or spatial_ref.GetAttrValue('AUTHORITY', 1) + _name = spatial_ref.GetAuthorityName(None) or spatial_ref.GetAttrValue( + "AUTHORITY", 0 + ) + _code = ( + spatial_ref.GetAuthorityCode("PROJCS") + or spatial_ref.GetAuthorityCode("GEOGCS") + or spatial_ref.GetAttrValue("AUTHORITY", 1) + ) return f"{_name}:{_code}" def get_ogr2ogr_driver(self): @@ -279,16 +296,19 @@ def import_resource(self, files: dict, execution_id: str, **kwargs) -> str: logger.info(f"Total number of layers available: {layer_count}") _exec = self._get_execution_request_object(execution_id) _input = {**_exec.input_params, **{"total_layers": layer_count}} - orchestrator.update_execution_request_status(execution_id=str(execution_id), input_params=_input) + orchestrator.update_execution_request_status( + execution_id=str(execution_id), input_params=_input + ) dynamic_model = None celery_group = None try: # start looping on the layers available for index, layer in enumerate(layers, start=1): - layer_name = self.fixup_name(layer.GetName()) - should_be_overwritten = _exec.input_params.get("overwrite_existing_layer") + should_be_overwritten = _exec.input_params.get( + "overwrite_existing_layer" + ) # should_be_imported check if the user+layername already exists or not if ( should_be_imported( @@ -299,18 +319,27 @@ def import_resource(self, files: dict, execution_id: str, **kwargs) -> str: ), overwrite_existing_layer=should_be_overwritten, ) - #and layer.GetGeometryColumn() is not None + # and layer.GetGeometryColumn() is not None ): # update the execution request object # setup dynamic model and retrieve the group task needed for tun the async workflow - # create the async task for create the resource into geonode_data with ogr2ogr + # create the async task for create the resource into geonode_data with ogr2ogr if os.getenv("IMPORTER_ENABLE_DYN_MODELS", False): - dynamic_model, alternate, celery_group = self.setup_dynamic_model( - layer, execution_id, should_be_overwritten, username=_exec.user + ( + dynamic_model, + alternate, + celery_group, + ) = self.setup_dynamic_model( + layer, + execution_id, + should_be_overwritten, + username=_exec.user, ) else: - alternate = self.find_alternate_by_dataset(_exec, layer_name, should_be_overwritten) - + alternate = self.find_alternate_by_dataset( + _exec, layer_name, should_be_overwritten + ) + ogr_res = self.get_ogr2ogr_task_group( execution_id, files, @@ -339,7 +368,7 @@ def import_resource(self, files: dict, execution_id: str, **kwargs) -> str: "importer.import_resource", layer_name, alternate, - **kwargs + **kwargs, ) ) except Exception as e: @@ -355,7 +384,9 @@ def import_resource(self, files: dict, execution_id: str, **kwargs) -> str: def find_alternate_by_dataset(self, _exec_obj, layer_name, should_be_overwritten): workspace = get_geoserver_cascading_workspace(create=False) - dataset_available = Dataset.objects.filter(alternate__iexact=f"{workspace.name}:{layer_name}") + dataset_available = Dataset.objects.filter( + alternate__iexact=f"{workspace.name}:{layer_name}" + ) dataset_exists = dataset_available.exists() @@ -367,7 +398,7 @@ def find_alternate_by_dataset(self, _exec_obj, layer_name, should_be_overwritten alternate = create_alternate(layer_name, str(_exec_obj.exec_id)) return alternate - + def setup_dynamic_model( self, layer: ogr.Layer, @@ -401,10 +432,10 @@ def setup_dynamic_model( """ dynamic_schema = dynamic_schema.get() elif not dataset_exists and not dynamic_schema_exists: - ''' + """ cames here when is a new brand upload or when (for any reasons) the dataset exists but the dynamic model has not been created before - ''' + """ # layer_name = create_alternate(layer_name, execution_id) dynamic_schema = ModelSchema.objects.create( name=layer_name, @@ -412,10 +443,10 @@ def setup_dynamic_model( managed=False, db_table_name=layer_name, ) - elif (not dataset_exists and dynamic_schema_exists) or ( - dataset_exists and dynamic_schema_exists and not should_be_overwritten - ) or ( - dataset_exists and not dynamic_schema_exists + elif ( + (not dataset_exists and dynamic_schema_exists) + or (dataset_exists and dynamic_schema_exists and not should_be_overwritten) + or (dataset_exists and not dynamic_schema_exists) ): """ it comes here when the layer should not be overrided so we append the UUID @@ -456,20 +487,34 @@ def create_dynamic_model_fields( {"name": x.name.lower(), "class_name": self._get_type(x), "null": True} for x in layer.schema ] - if layer.GetGeometryColumn() or self.default_geometry_column_name and ogr.GeometryTypeToName(layer.GetGeomType()) not in ['Geometry Collection', 'Unknown (any)', 'None']: + if ( + layer.GetGeometryColumn() + or self.default_geometry_column_name + and ogr.GeometryTypeToName(layer.GetGeomType()) + not in ["Geometry Collection", "Unknown (any)", "None"] + ): # the geometry colum is not returned rom the layer.schema, so we need to extract it manually layer_schema += [ { - "name": layer.GetGeometryColumn() or self.default_geometry_column_name, - "class_name": GEOM_TYPE_MAPPING.get(self.promote_to_multi(ogr.GeometryTypeToName(layer.GetGeomType()))), - "dim": 2 if not ogr.GeometryTypeToName(layer.GetGeomType()).lower().startswith('3d') else 3 + "name": layer.GetGeometryColumn() + or self.default_geometry_column_name, + "class_name": GEOM_TYPE_MAPPING.get( + self.promote_to_multi( + ogr.GeometryTypeToName(layer.GetGeomType()) + ) + ), + "dim": 2 + if not ogr.GeometryTypeToName(layer.GetGeomType()) + .lower() + .startswith("3d") + else 3, } ] # ones we have the schema, here we create a list of chunked value # so the async task will handle max of 30 field per task list_chunked = [ - layer_schema[i: i + 30] for i in range(0, len(layer_schema), 30) + layer_schema[i : i + 30] for i in range(0, len(layer_schema), 30) ] # definition of the celery group needed to run the async workflow. @@ -484,15 +529,20 @@ def create_dynamic_model_fields( return dynamic_model_schema, celery_group def promote_to_multi(self, geometry_name: str): - ''' + """ If needed change the name of the geometry, by promoting it to Multi example if is Point -> MultiPoint Needed for the shapefiles - ''' + """ return geometry_name def create_geonode_resource( - self, layer_name: str, alternate: str, execution_id: str, resource_type: Dataset = Dataset, files=None + self, + layer_name: str, + alternate: str, + execution_id: str, + resource_type: Dataset = Dataset, + files=None, ): """ Base function to create the resource into geonode. Each handler can specify @@ -527,7 +577,12 @@ def create_geonode_resource( dirty_state=True, title=layer_name, owner=_exec.user, - files=list(set(list(_exec.input_params.get("files", {}).values()) or list(files))), + files=list( + set( + list(_exec.input_params.get("files", {}).values()) + or list(files) + ) + ), ), ) @@ -544,9 +599,13 @@ def create_geonode_resource( return saved_dataset def overwrite_geonode_resource( - self, layer_name: str, alternate: str, execution_id: str, resource_type: Dataset = Dataset, files=None + self, + layer_name: str, + alternate: str, + execution_id: str, + resource_type: Dataset = Dataset, + files=None, ): - dataset = resource_type.objects.filter(alternate__icontains=alternate) _exec = self._get_execution_request_object(execution_id) @@ -557,19 +616,25 @@ def overwrite_geonode_resource( if dataset.exists() and _overwrite: dataset = dataset.first() - dataset = resource_manager.update(dataset.uuid, instance=dataset, files=files) + dataset = resource_manager.update( + dataset.uuid, instance=dataset, files=files + ) self.handle_xml_file(dataset, _exec) self.handle_sld_file(dataset, _exec) - resource_manager.set_thumbnail(dataset.uuid, instance=dataset, overwrite=False) + resource_manager.set_thumbnail( + dataset.uuid, instance=dataset, overwrite=False + ) dataset.refresh_from_db() return dataset elif not dataset.exists() and _overwrite: logger.warning( f"The dataset required {alternate} does not exists, but an overwrite is required, the resource will be created" ) - return self.create_geonode_resource(layer_name, alternate, execution_id, resource_type, files) + return self.create_geonode_resource( + layer_name, alternate, execution_id, resource_type, files + ) elif not dataset.exists() and not _overwrite: logger.warning( "The resource does not exists, please use 'create_geonode_resource' to create one" @@ -597,7 +662,13 @@ def handle_sld_file(self, saved_dataset: Dataset, _exec: ExecutionRequest): vals={"dirty_state": True}, ) - def create_resourcehandlerinfo(self, handler_module_path: str, resource: Dataset, execution_id: ExecutionRequest, **kwargs): + def create_resourcehandlerinfo( + self, + handler_module_path: str, + resource: Dataset, + execution_id: ExecutionRequest, + **kwargs, + ): """ Create relation between the GeonodeResource and the handler used to create/copy it @@ -606,10 +677,16 @@ def create_resourcehandlerinfo(self, handler_module_path: str, resource: Dataset handler_module_path=handler_module_path, resource=resource, execution_request=execution_id, - kwargs=kwargs.get('kwargs', {}) or kwargs + kwargs=kwargs.get("kwargs", {}) or kwargs, ) - def overwrite_resourcehandlerinfo(self, handler_module_path: str, resource: Dataset, execution_id: ExecutionRequest, **kwargs): + def overwrite_resourcehandlerinfo( + self, + handler_module_path: str, + resource: Dataset, + execution_id: ExecutionRequest, + **kwargs, + ): """ Overwrite the ResourceHandlerInfo """ @@ -618,25 +695,38 @@ def overwrite_resourcehandlerinfo(self, handler_module_path: str, resource: Data handler_module_path=handler_module_path, resource=resource, execution_request=execution_id, - kwargs=kwargs.get('kwargs', {}) or kwargs + kwargs=kwargs.get("kwargs", {}) or kwargs, ) return - return self.create_resourcehandlerinfo(handler_module_path, resource, execution_id, **kwargs) + return self.create_resourcehandlerinfo( + handler_module_path, resource, execution_id, **kwargs + ) def copy_geonode_resource( - self, alternate: str, resource: Dataset, _exec: ExecutionRequest, data_to_update: dict, new_alternate: str, **kwargs + self, + alternate: str, + resource: Dataset, + _exec: ExecutionRequest, + data_to_update: dict, + new_alternate: str, + **kwargs, ): resource = self.create_geonode_resource( layer_name=data_to_update.get("title"), alternate=new_alternate, execution_id=str(_exec.exec_id), - files=resource.files + files=resource.files, ) resource.refresh_from_db() return resource def get_ogr2ogr_task_group( - self, execution_id: str, files: dict, layer, should_be_overwritten: bool, alternate: str + self, + execution_id: str, + files: dict, + layer, + should_be_overwritten: bool, + alternate: str, ): """ In case the OGR2OGR is different from the default one, is enough to ovverride this method @@ -661,24 +751,29 @@ def _get_type(self, _type: str): """ return STANDARD_TYPE_MAPPING.get(ogr.FieldDefn.GetTypeName(_type)) - def rollback(self, exec_id, rollback_from_step, action_to_rollback, *args, **kwargs): - + def rollback( + self, exec_id, rollback_from_step, action_to_rollback, *args, **kwargs + ): steps = self.ACTIONS.get(action_to_rollback) step_index = steps.index(rollback_from_step) # the start_import, start_copy etc.. dont do anything as step, is just the start # so there is nothing to rollback - steps_to_rollback = steps[1:step_index+1] + steps_to_rollback = steps[1 : step_index + 1] if not steps_to_rollback: return # reversing the tuple to going backwards with the rollback reversed_steps = steps_to_rollback[::-1] instance_name = None try: - instance_name = find_key_recursively(kwargs, "new_dataset_alternate") or args[3] + instance_name = ( + find_key_recursively(kwargs, "new_dataset_alternate") or args[3] + ) except: pass - - logger.warning(f"Starting rollback for execid: {exec_id} resource published was: {instance_name}") + + logger.warning( + f"Starting rollback for execid: {exec_id} resource published was: {instance_name}" + ) for step in reversed_steps: normalized_step_name = step.split(".")[-1] @@ -686,24 +781,32 @@ def rollback(self, exec_id, rollback_from_step, action_to_rollback, *args, **kwa function = getattr(self, f"_{normalized_step_name}_rollback") function(exec_id, instance_name, *args, **kwargs) - logger.warning(f"Rollback for execid: {exec_id} resource published was: {instance_name} completed") + logger.warning( + f"Rollback for execid: {exec_id} resource published was: {instance_name} completed" + ) def _import_resource_rollback(self, exec_id, instance_name=None, *args, **kwargs): - ''' + """ We use the schema editor directly, because the model itself is not managed on creation, but for the delete since we are going to handle, we can use it - ''' - logger.info(f"Rollback dynamic model & ogr2ogr step in progress for execid: {exec_id} resource published was: {instance_name}") + """ + logger.info( + f"Rollback dynamic model & ogr2ogr step in progress for execid: {exec_id} resource published was: {instance_name}" + ) schema = None if os.getenv("IMPORTER_ENABLE_DYN_MODELS", False): schema = ModelSchema.objects.filter(name=instance_name).first() if schema is not None: - _model_editor = ModelSchemaEditor(initial_model=instance_name, db_name=schema.db_name) + _model_editor = ModelSchemaEditor( + initial_model=instance_name, db_name=schema.db_name + ) _model_editor.drop_table(schema.as_model()) ModelSchema.objects.filter(name=instance_name).delete() elif schema is None: try: - logger.info("Dynamic model does not exists, removing ogr2ogr table in progress") + logger.info( + "Dynamic model does not exists, removing ogr2ogr table in progress" + ) if instance_name is None: logger.info("No table created, skipping...") return @@ -714,29 +817,39 @@ def _import_resource_rollback(self, exec_id, instance_name=None, *args, **kwargs logger.info(e) pass - def _publish_resource_rollback(self, exec_id, instance_name=None, *args, **kwargs): - ''' + def _publish_resource_rollback(self, exec_id, instance_name=None, *args, **kwargs): + """ We delete the resource from geoserver - ''' - logger.info(f"Rollback publishing step in progress for execid: {exec_id} resource published was: {instance_name}") + """ + logger.info( + f"Rollback publishing step in progress for execid: {exec_id} resource published was: {instance_name}" + ) exec_object = orchestrator.get_execution_object(exec_id) handler_module_path = exec_object.input_params.get("handler_module_path") publisher = DataPublisher(handler_module_path=handler_module_path) publisher.delete_resource(instance_name) - - def _create_geonode_resource_rollback(self, exec_id, instance_name=None, *args, **kwargs): - ''' + + def _create_geonode_resource_rollback( + self, exec_id, instance_name=None, *args, **kwargs + ): + """ The handler will remove the resource from geonode - ''' - logger.info(f"Rollback geonode step in progress for execid: {exec_id} resource created was: {instance_name}") + """ + logger.info( + f"Rollback geonode step in progress for execid: {exec_id} resource created was: {instance_name}" + ) resource = ResourceBase.objects.filter(alternate__icontains=instance_name) if resource.exists(): resource.delete() - - def _copy_dynamic_model_rollback(self, exec_id, instance_name=None, *args, **kwargs): + + def _copy_dynamic_model_rollback( + self, exec_id, instance_name=None, *args, **kwargs + ): self._import_resource_rollback(exec_id, instance_name=instance_name) - - def _copy_geonode_resource_rollback(self, exec_id, instance_name=None, *args, **kwargs): + + def _copy_geonode_resource_rollback( + self, exec_id, instance_name=None, *args, **kwargs + ): self._create_geonode_resource_rollback(exec_id, instance_name=instance_name) @@ -753,12 +866,13 @@ def import_next_step( actual_step: str, layer_name: str, alternate: str, - **kwargs: dict + **kwargs: dict, ): """ If the ingestion of the resource is successfuly, the next step for the layer is called """ from importer.celery_tasks import import_orchestrator + try: _exec = orchestrator.get_execution_object(execution_id) @@ -784,14 +898,13 @@ def import_next_step( layer=layer_name, alternate=alternate, error=e, - **kwargs + **kwargs, ) finally: return "import_next_step", alternate, execution_id - @importer_app.task( base=SingleMessageErrorHandler, name="importer.import_with_ogr2ogr", @@ -824,7 +937,12 @@ def import_with_ogr2ogr( process = Popen(" ".join(commands), stdout=PIPE, stderr=PIPE, shell=True) stdout, stderr = process.communicate() - if stderr is not None and stderr != b"" and b"ERROR" in stderr or b'Syntax error' in stderr: + if ( + stderr is not None + and stderr != b"" + and b"ERROR" in stderr + or b"Syntax error" in stderr + ): try: err = stderr.decode() except Exception: @@ -841,11 +959,13 @@ def import_with_ogr2ogr( layer=original_name, alternate=alternate, error=e, - **{} + **{}, ) raise Exception(e) def normalize_ogr2ogr_error(err, original_name): - getting_errors = [y for y in err.split('\n') if 'ERROR ' in y] - return ', '.join([x.split(original_name)[0] for x in getting_errors if 'ERROR' in x]) + getting_errors = [y for y in err.split("\n") if "ERROR " in y] + return ", ".join( + [x.split(original_name)[0] for x in getting_errors if "ERROR" in x] + ) diff --git a/importer/handlers/csv/handler.py b/importer/handlers/csv/handler.py index b199a86c..80ebf79d 100644 --- a/importer/handlers/csv/handler.py +++ b/importer/handlers/csv/handler.py @@ -42,14 +42,21 @@ class CSVFileHandler(BaseVectorFileHandler): ), } - possible_geometry_column_name = ['geom', 'geometry', 'wkt_geom', 'the_geom'] - possible_lat_column = ['latitude', 'lat', 'y'] - possible_long_column = ['longitude', 'long', 'x'] + possible_geometry_column_name = ["geom", "geometry", "wkt_geom", "the_geom"] + possible_lat_column = ["latitude", "lat", "y"] + possible_long_column = ["longitude", "long", "x"] possible_latlong_column = possible_lat_column + possible_long_column @property def supported_file_extension_config(self): - return {"id": "csv", "label": "CSV", "format": "vector", "mimeType": ["text/csv"], "ext": ["csv"], "optional": ["sld", "xml"]} + return { + "id": "csv", + "label": "CSV", + "format": "vector", + "mimeType": ["text/csv"], + "ext": ["csv"], + "optional": ["sld", "xml"], + } @staticmethod def can_handle(_data) -> bool: @@ -93,24 +100,31 @@ def is_valid(files, user): detail=f"With the provided CSV, the number of max parallel upload will exceed the limit of {max_upload}" ) - schema_keys = [ - x.name.lower() - for layer in layers - for x in layer.schema - ] - geom_is_in_schema = any(x in schema_keys for x in CSVFileHandler().possible_geometry_column_name) + schema_keys = [x.name.lower() for layer in layers for x in layer.schema] + geom_is_in_schema = any( + x in schema_keys for x in CSVFileHandler().possible_geometry_column_name + ) has_lat = any(x in CSVFileHandler().possible_lat_column for x in schema_keys) has_long = any(x in CSVFileHandler().possible_long_column for x in schema_keys) - fields = CSVFileHandler().possible_geometry_column_name + CSVFileHandler().possible_latlong_column + fields = ( + CSVFileHandler().possible_geometry_column_name + + CSVFileHandler().possible_latlong_column + ) if has_lat and not has_long: - raise InvalidCSVException(f"Longitude is missing. Supported names: {', '.join(CSVFileHandler().possible_long_column)}") + raise InvalidCSVException( + f"Longitude is missing. Supported names: {', '.join(CSVFileHandler().possible_long_column)}" + ) if not has_lat and has_long: - raise InvalidCSVException(f"Latitude is missing. Supported names: {', '.join(CSVFileHandler().possible_lat_column)}") + raise InvalidCSVException( + f"Latitude is missing. Supported names: {', '.join(CSVFileHandler().possible_lat_column)}" + ) if not geom_is_in_schema and not has_lat and not has_long: - raise InvalidCSVException(f"Not enough geometry field are set. The possibilities are: {','.join(fields)}") + raise InvalidCSVException( + f"Not enough geometry field are set. The possibilities are: {','.join(fields)}" + ) return True @@ -119,13 +133,18 @@ def get_ogr2ogr_driver(self): @staticmethod def create_ogr2ogr_command(files, original_name, ovverwrite_layer, alternate): - ''' + """ Define the ogr2ogr command to be executed. This is a default command that is needed to import a vector file - ''' - base_command = BaseVectorFileHandler.create_ogr2ogr_command(files, original_name, ovverwrite_layer, alternate) + """ + base_command = BaseVectorFileHandler.create_ogr2ogr_command( + files, original_name, ovverwrite_layer, alternate + ) additional_option = ' -oo "GEOM_POSSIBLE_NAMES=geom*,the_geom*,wkt_geom" -oo "X_POSSIBLE_NAMES=x,long*" -oo "Y_POSSIBLE_NAMES=y,lat*"' - return f"{base_command } -oo KEEP_GEOM_COLUMNS=NO -lco GEOMETRY_NAME={BaseVectorFileHandler().default_geometry_column_name} " + additional_option + return ( + f"{base_command } -oo KEEP_GEOM_COLUMNS=NO -lco GEOMETRY_NAME={BaseVectorFileHandler().default_geometry_column_name} " + + additional_option + ) def create_dynamic_model_fields( self, @@ -140,38 +159,54 @@ def create_dynamic_model_fields( {"name": x.name.lower(), "class_name": self._get_type(x), "null": True} for x in layer.schema ] - if layer.GetGeometryColumn() or self.default_geometry_column_name and ogr.GeometryTypeToName(layer.GetGeomType()) not in ['Geometry Collection', 'Unknown (any)']: + if ( + layer.GetGeometryColumn() + or self.default_geometry_column_name + and ogr.GeometryTypeToName(layer.GetGeomType()) + not in ["Geometry Collection", "Unknown (any)"] + ): # the geometry colum is not returned rom the layer.schema, so we need to extract it manually # checking if the geometry has been wrogly read as string - schema_keys = [x['name'] for x in layer_schema] - geom_is_in_schema = (x in schema_keys for x in self.possible_geometry_column_name) - if any(geom_is_in_schema) and layer.GetGeomType() == 100: # 100 means None so Geometry not found - field_name = [x for x in self.possible_geometry_column_name if x in schema_keys][0] + schema_keys = [x["name"] for x in layer_schema] + geom_is_in_schema = ( + x in schema_keys for x in self.possible_geometry_column_name + ) + if ( + any(geom_is_in_schema) and layer.GetGeomType() == 100 + ): # 100 means None so Geometry not found + field_name = [ + x for x in self.possible_geometry_column_name if x in schema_keys + ][0] index = layer.GetFeature(1).keys().index(field_name) geom = [x for x in layer.GetFeature(1)][index] class_name = GEOM_TYPE_MAPPING.get( - self.promote_to_multi( - geom.split("(")[0].replace(" ", "").title() - ) + self.promote_to_multi(geom.split("(")[0].replace(" ", "").title()) ) - layer_schema = [x for x in layer_schema if field_name not in x['name']] + layer_schema = [x for x in layer_schema if field_name not in x["name"]] elif any(x in self.possible_latlong_column for x in schema_keys): - class_name = GEOM_TYPE_MAPPING.get(self.promote_to_multi('Point')) + class_name = GEOM_TYPE_MAPPING.get(self.promote_to_multi("Point")) else: - class_name = GEOM_TYPE_MAPPING.get(self.promote_to_multi(ogr.GeometryTypeToName(layer.GetGeomType()))) + class_name = GEOM_TYPE_MAPPING.get( + self.promote_to_multi(ogr.GeometryTypeToName(layer.GetGeomType())) + ) layer_schema += [ { - "name": layer.GetGeometryColumn() or self.default_geometry_column_name, + "name": layer.GetGeometryColumn() + or self.default_geometry_column_name, "class_name": class_name, - "dim": 2 if not ogr.GeometryTypeToName(layer.GetGeomType()).lower().startswith('3d') else 3 + "dim": 2 + if not ogr.GeometryTypeToName(layer.GetGeomType()) + .lower() + .startswith("3d") + else 3, } ] # ones we have the schema, here we create a list of chunked value # so the async task will handle max of 30 field per task list_chunked = [ - layer_schema[i: i + 30] for i in range(0, len(layer_schema), 30) + layer_schema[i : i + 30] for i in range(0, len(layer_schema), 30) ] # definition of the celery group needed to run the async workflow. @@ -185,12 +220,16 @@ def create_dynamic_model_fields( return dynamic_model_schema, celery_group - def extract_resource_to_publish(self, files, action, layer_name, alternate, **kwargs): + def extract_resource_to_publish( + self, files, action, layer_name, alternate, **kwargs + ): if action == exa.COPY.value: return [ { "name": alternate, - "crs": ResourceBase.objects.filter(alternate__istartswith=layer_name) + "crs": ResourceBase.objects.filter( + alternate__istartswith=layer_name + ) .first() .srid, } @@ -202,7 +241,9 @@ def extract_resource_to_publish(self, files, action, layer_name, alternate, **kw return [ { "name": alternate or layer_name, - "crs": self.identify_authority(_l) if _l.GetSpatialRef() else 'EPSG:4326' + "crs": self.identify_authority(_l) + if _l.GetSpatialRef() + else "EPSG:4326", } for _l in layers if self.fixup_name(_l.GetName()) == layer_name diff --git a/importer/handlers/csv/tests.py b/importer/handlers/csv/tests.py index d6977bfd..f886b956 100644 --- a/importer/handlers/csv/tests.py +++ b/importer/handlers/csv/tests.py @@ -29,9 +29,7 @@ def setUpClass(cls): cls.invalid_files = {"base_file": cls.invalid_csv} cls.valid_files = {"base_file": cls.valid_csv} cls.owner = get_user_model().objects.first() - cls.layer = create_single_dataset( - name="test", owner=cls.owner - ) + cls.layer = create_single_dataset(name="test", owner=cls.owner) def test_task_list_is_the_expected_one(self): expected = ( @@ -60,18 +58,18 @@ def test_is_valid_should_raise_exception_if_the_csv_is_invalid(self): self.assertIsNotNone(_exc) self.assertTrue( - "The CSV provided is invalid, no layers found" - in str(_exc.exception.detail) + "The CSV provided is invalid, no layers found" in str(_exc.exception.detail) ) def test_is_valid_should_raise_exception_if_the_csv_missing_geom(self): with self.assertRaises(InvalidCSVException) as _exc: - self.handler.is_valid(files={"base_file": self.missing_geom}, user=self.user) + self.handler.is_valid( + files={"base_file": self.missing_geom}, user=self.user + ) self.assertIsNotNone(_exc) self.assertTrue( - "Not enough geometry field are set" - in str(_exc.exception.detail) + "Not enough geometry field are set" in str(_exc.exception.detail) ) def test_is_valid_should_raise_exception_if_the_csv_missing_lat(self): @@ -79,20 +77,16 @@ def test_is_valid_should_raise_exception_if_the_csv_missing_lat(self): self.handler.is_valid(files={"base_file": self.missing_lat}, user=self.user) self.assertIsNotNone(_exc) - self.assertTrue( - "Latitude is missing" - in str(_exc.exception.detail) - ) + self.assertTrue("Latitude is missing" in str(_exc.exception.detail)) def test_is_valid_should_raise_exception_if_the_csv_missing_long(self): with self.assertRaises(InvalidCSVException) as _exc: - self.handler.is_valid(files={"base_file": self.missing_long}, user=self.user) + self.handler.is_valid( + files={"base_file": self.missing_long}, user=self.user + ) self.assertIsNotNone(_exc) - self.assertTrue( - "Longitude is missing" - in str(_exc.exception.detail) - ) + self.assertTrue("Longitude is missing" in str(_exc.exception.detail)) def test_is_valid_should_raise_exception_if_the_parallelism_is_met(self): parallelism, created = UploadParallelismLimit.objects.get_or_create( @@ -146,8 +140,10 @@ def test_can_handle_should_return_false_for_other_files(self): actual = self.handler.can_handle({"base_file": "random.file"}) self.assertFalse(actual) - @patch('importer.handlers.common.vector.Popen') - def test_import_with_ogr2ogr_without_errors_should_call_the_right_command(self, _open): + @patch("importer.handlers.common.vector.Popen") + def test_import_with_ogr2ogr_without_errors_should_call_the_right_command( + self, _open + ): _uuid = uuid.uuid4() comm = MagicMock() @@ -160,14 +156,17 @@ def test_import_with_ogr2ogr_without_errors_should_call_the_right_command(self, original_name="dataset", handler_module_path=str(self.handler), ovverwrite_layer=False, - alternate="alternate" + alternate="alternate", ) - self.assertEqual('ogr2ogr', _task) + self.assertEqual("ogr2ogr", _task) self.assertEqual(alternate, "alternate") self.assertEqual(str(_uuid), execution_id) _open.assert_called_once() _open.assert_called_with( - f'/usr/bin/ogr2ogr --config PG_USE_COPY YES -f PostgreSQL PG:" dbname=\'geonode_data\' host=localhost port=5434 user=\'geonode\' password=\'geonode\' " "{self.valid_csv}" -lco DIM=2 -nln alternate "dataset" -oo KEEP_GEOM_COLUMNS=NO -lco GEOMETRY_NAME=geometry -oo "GEOM_POSSIBLE_NAMES=geom*,the_geom*,wkt_geom" -oo "X_POSSIBLE_NAMES=x,long*" -oo "Y_POSSIBLE_NAMES=y,lat*"', stdout=-1, stderr=-1, shell=True # noqa + f'/usr/bin/ogr2ogr --config PG_USE_COPY YES -f PostgreSQL PG:" dbname=\'geonode_data\' host=localhost port=5434 user=\'geonode\' password=\'geonode\' " "{self.valid_csv}" -lco DIM=2 -nln alternate "dataset" -oo KEEP_GEOM_COLUMNS=NO -lco GEOMETRY_NAME=geometry -oo "GEOM_POSSIBLE_NAMES=geom*,the_geom*,wkt_geom" -oo "X_POSSIBLE_NAMES=x,long*" -oo "Y_POSSIBLE_NAMES=y,lat*"', + stdout=-1, + stderr=-1, + shell=True, # noqa ) diff --git a/importer/handlers/geojson/tests.py b/importer/handlers/geojson/tests.py index 39ddf748..19fda437 100644 --- a/importer/handlers/geojson/tests.py +++ b/importer/handlers/geojson/tests.py @@ -1,4 +1,3 @@ - import uuid from django.test import TestCase from mock import MagicMock, patch @@ -35,7 +34,7 @@ def test_task_list_is_the_expected_one(self): "start_import", "importer.import_resource", "importer.publish_resource", - "importer.create_geonode_resource" + "importer.create_geonode_resource", ) self.assertEqual(len(self.handler.ACTIONS["import"]), 4) self.assertTupleEqual(expected, self.handler.ACTIONS["import"]) @@ -106,8 +105,10 @@ def test_can_handle_should_return_false_for_other_files(self): actual = self.handler.can_handle({"base_file": "random.gpkg"}) self.assertFalse(actual) - @patch('importer.handlers.common.vector.Popen') - def test_import_with_ogr2ogr_without_errors_should_call_the_right_command(self, _open): + @patch("importer.handlers.common.vector.Popen") + def test_import_with_ogr2ogr_without_errors_should_call_the_right_command( + self, _open + ): _uuid = uuid.uuid4() comm = MagicMock() @@ -120,14 +121,17 @@ def test_import_with_ogr2ogr_without_errors_should_call_the_right_command(self, original_name="dataset", handler_module_path=str(self.handler), ovverwrite_layer=False, - alternate="alternate" + alternate="alternate", ) - self.assertEqual('ogr2ogr', _task) + self.assertEqual("ogr2ogr", _task) self.assertEqual(alternate, "alternate") self.assertEqual(str(_uuid), execution_id) _open.assert_called_once() _open.assert_called_with( - f'/usr/bin/ogr2ogr --config PG_USE_COPY YES -f PostgreSQL PG:" dbname=\'geonode_data\' host=localhost port=5434 user=\'geonode\' password=\'geonode\' " "{self.valid_files.get("base_file")}" -lco DIM=2 -nln alternate "dataset" -lco GEOMETRY_NAME=geometry', stdout=-1, stderr=-1, shell=True # noqa + f'/usr/bin/ogr2ogr --config PG_USE_COPY YES -f PostgreSQL PG:" dbname=\'geonode_data\' host=localhost port=5434 user=\'geonode\' password=\'geonode\' " "{self.valid_files.get("base_file")}" -lco DIM=2 -nln alternate "dataset" -lco GEOMETRY_NAME=geometry', + stdout=-1, + stderr=-1, + shell=True, # noqa ) diff --git a/importer/handlers/geotiff/handler.py b/importer/handlers/geotiff/handler.py index 67a4b199..5c6ae0db 100644 --- a/importer/handlers/geotiff/handler.py +++ b/importer/handlers/geotiff/handler.py @@ -38,12 +38,12 @@ class GeoTiffFileHandler(BaseRasterFileHandler): @property def supported_file_extension_config(self): return { - "id": 'tiff', - "label": 'GeoTIFF', - "format": 'raster', - "ext": ['tiff', 'tif', 'geotiff', 'geotif'], - "mimeType": ['image/tiff'], - "optional": ['xml', 'sld'] + "id": "tiff", + "label": "GeoTIFF", + "format": "raster", + "ext": ["tiff", "tif", "geotiff", "geotif"], + "mimeType": ["image/tiff"], + "optional": ["xml", "sld"], } @staticmethod @@ -56,7 +56,7 @@ def can_handle(_data) -> bool: if not base: return False ext = base.split(".")[-1] if isinstance(base, str) else base.name.split(".")[-1] - return ext in ["tiff", "geotiff", "tif", 'geotif'] + return ext in ["tiff", "geotiff", "tif", "geotif"] @staticmethod def is_valid(files, user): diff --git a/importer/handlers/geotiff/tests.py b/importer/handlers/geotiff/tests.py index b6254cb3..59dbe78a 100644 --- a/importer/handlers/geotiff/tests.py +++ b/importer/handlers/geotiff/tests.py @@ -1,4 +1,3 @@ - from django.test import TestCase from importer.handlers.geotiff.exceptions import InvalidGeoTiffException from django.contrib.auth import get_user_model @@ -22,16 +21,14 @@ def setUpClass(cls): cls.user, _ = get_user_model().objects.get_or_create(username="admin") cls.invalid_tiff = {"base_file": "invalid.file.foo"} cls.owner = get_user_model().objects.first() - cls.layer = create_single_dataset( - name="test_grid", owner=cls.owner - ) + cls.layer = create_single_dataset(name="test_grid", owner=cls.owner) def test_task_list_is_the_expected_one(self): expected = ( "start_import", "importer.import_resource", "importer.publish_resource", - "importer.create_geonode_resource" + "importer.create_geonode_resource", ) self.assertEqual(len(self.handler.ACTIONS["import"]), 4) self.assertTupleEqual(expected, self.handler.ACTIONS["import"]) @@ -41,7 +38,7 @@ def test_task_list_is_the_expected_one_copy(self): "start_copy", "importer.copy_raster_file", "importer.publish_resource", - "importer.copy_geonode_resource" + "importer.copy_geonode_resource", ) self.assertEqual(len(self.handler.ACTIONS["copy"]), 4) self.assertTupleEqual(expected, self.handler.ACTIONS["copy"]) @@ -85,7 +82,8 @@ def test_is_valid_should_raise_exception_if_the_tif_is_invalid_format(self): self.assertIsNotNone(_exc) self.assertTrue( - "Please remove the additional dots in the filename" in str(_exc.exception.detail) + "Please remove the additional dots in the filename" + in str(_exc.exception.detail) ) def test_is_valid_should_raise_exception_if_the_tif_not_provided(self): @@ -93,9 +91,7 @@ def test_is_valid_should_raise_exception_if_the_tif_not_provided(self): self.handler.is_valid(files={"foo": "bar"}, user=self.user) self.assertIsNotNone(_exc) - self.assertTrue( - "base file is not provided" in str(_exc.exception.detail) - ) + self.assertTrue("base file is not provided" in str(_exc.exception.detail)) def test_can_handle_should_return_true_for_tif(self): actual = self.handler.can_handle(self.valid_files) diff --git a/importer/handlers/gpkg/handler.py b/importer/handlers/gpkg/handler.py index c51ceec3..303b2017 100644 --- a/importer/handlers/gpkg/handler.py +++ b/importer/handlers/gpkg/handler.py @@ -41,7 +41,12 @@ class GPKGFileHandler(BaseVectorFileHandler): @property def supported_file_extension_config(self): - return {"id": "gpkg", "label": "GeoPackage", "format": "archive", "ext": ["gpkg"]} + return { + "id": "gpkg", + "label": "GeoPackage", + "format": "archive", + "ext": ["gpkg"], + } @staticmethod def can_handle(_data) -> bool: @@ -109,14 +114,12 @@ def is_valid(files, user): error_to_raise = [] for error in validator[0]: logger.error(error) - if 'locations' in error: - error_to_raise.extend(error['locations']) + if "locations" in error: + error_to_raise.extend(error["locations"]) else: - error_to_raise.append(error['validation_description']) + error_to_raise.append(error["validation_description"]) - raise InvalidGeopackageException( - '. '.join(error_to_raise) - ) + raise InvalidGeopackageException(". ".join(error_to_raise)) return True diff --git a/importer/handlers/gpkg/tasks.py b/importer/handlers/gpkg/tasks.py index 948fbdd9..3ffc0bb9 100644 --- a/importer/handlers/gpkg/tasks.py +++ b/importer/handlers/gpkg/tasks.py @@ -8,15 +8,14 @@ class SingleMessageErrorHandler(Task): - max_retries = 1 track_started = True def on_failure(self, exc, task_id, args, kwargs, einfo): - ''' + """ THis is separated because for gpkg we have a side effect (we rollback dynamic models and ogr2ogr) based on this failure step which is not meant for the other handlers - ''' + """ evaluate_error(self, exc, task_id, args, kwargs, einfo) diff --git a/importer/handlers/gpkg/tests.py b/importer/handlers/gpkg/tests.py index e609da99..cb36e6b9 100644 --- a/importer/handlers/gpkg/tests.py +++ b/importer/handlers/gpkg/tests.py @@ -114,7 +114,6 @@ def test_can_handle_should_return_false_for_other_files(self): self.assertFalse(actual) def test_single_message_error_handler(self): - exec_id = orchestrator.create_execution_request( user=get_user_model().objects.first(), func_name="funct1", diff --git a/importer/handlers/kml/handler.py b/importer/handlers/kml/handler.py index 88927f1b..22e0dba1 100644 --- a/importer/handlers/kml/handler.py +++ b/importer/handlers/kml/handler.py @@ -41,7 +41,12 @@ class KMLFileHandler(BaseVectorFileHandler): @property def supported_file_extension_config(self): - return {"id": "kml", "label": "KML/KMZ", "format": "archive", "ext": ["kml", "kmz"]} + return { + "id": "kml", + "label": "KML/KMZ", + "format": "archive", + "ext": ["kml", "kmz"], + } @staticmethod def can_handle(_data) -> bool: diff --git a/importer/handlers/kml/tests.py b/importer/handlers/kml/tests.py index 373dfb6f..17c25db8 100644 --- a/importer/handlers/kml/tests.py +++ b/importer/handlers/kml/tests.py @@ -22,9 +22,7 @@ def setUpClass(cls): cls.invalid_files = {"base_file": cls.invalid_kml} cls.valid_files = {"base_file": cls.valid_kml} cls.owner = get_user_model().objects.first() - cls.layer = create_single_dataset( - name="extruded_polygon", owner=cls.owner - ) + cls.layer = create_single_dataset(name="extruded_polygon", owner=cls.owner) def test_task_list_is_the_expected_one(self): expected = ( diff --git a/importer/handlers/shapefile/handler.py b/importer/handlers/shapefile/handler.py index ef6bf3ef..0cdf3583 100644 --- a/importer/handlers/shapefile/handler.py +++ b/importer/handlers/shapefile/handler.py @@ -32,7 +32,7 @@ class ShapeFileHandler(BaseVectorFileHandler): "importer.copy_dynamic_model", "importer.copy_geonode_data_table", "importer.publish_resource", - "importer.copy_geonode_resource" + "importer.copy_geonode_resource", ), ira.ROLLBACK.value: ( "start_rollback", @@ -43,12 +43,12 @@ class ShapeFileHandler(BaseVectorFileHandler): @property def supported_file_extension_config(self): return { - "id": 'shp', - "label": 'ESRI Shapefile', - "format": 'vector', - "ext": ['shp'], - "requires": ['shp', 'prj', 'dbf', 'shx'], - "optional": ['xml', 'sld'] + "id": "shp", + "label": "ESRI Shapefile", + "format": "vector", + "ext": ["shp"], + "requires": ["shp", "prj", "dbf", "shx"], + "optional": ["xml", "sld"], } @staticmethod @@ -68,7 +68,11 @@ def has_serializer(data) -> bool: _base = data.get("base_file") if not _base: return False - if _base.endswith("shp") if isinstance(_base, str) else _base.name.endswith("shp"): + if ( + _base.endswith("shp") + if isinstance(_base, str) + else _base.name.endswith("shp") + ): return ShapeFileSerializer return False @@ -111,14 +115,16 @@ def is_valid(files, user): if x["id"] == "shp" ][0] - ''' + """ Check if the ext required for the shape file are available in the files uploaded by the user - ''' + """ is_valid = all( map( lambda x: any( - _ext.endswith(f"{_filename}.{x}") if isinstance(_ext, str) else _ext.name.endswith(f"{_filename}.{x}") + _ext.endswith(f"{_filename}.{x}") + if isinstance(_ext, str) + else _ext.name.endswith(f"{_filename}.{x}") for _ext in files.values() ), _shp_ext_needed, @@ -136,22 +142,32 @@ def get_ogr2ogr_driver(self): @staticmethod def create_ogr2ogr_command(files, original_name, ovverwrite_layer, alternate): - ''' + """ Define the ogr2ogr command to be executed. This is a default command that is needed to import a vector file - ''' - base_command = BaseVectorFileHandler.create_ogr2ogr_command(files, original_name, ovverwrite_layer, alternate) + """ + base_command = BaseVectorFileHandler.create_ogr2ogr_command( + files, original_name, ovverwrite_layer, alternate + ) layers = ogr.Open(files.get("base_file")) layer = layers.GetLayer(original_name) - additional_option = " -nlt PROMOTE_TO_MULTI" if layer is not None and 'Point' not in ogr.GeometryTypeToName(layer.GetGeomType()) else " " - return f"{base_command } -lco precision=no -lco DIM=2 -lco GEOMETRY_NAME={BaseVectorFileHandler().default_geometry_column_name}" + additional_option + additional_option = ( + " -nlt PROMOTE_TO_MULTI" + if layer is not None + and "Point" not in ogr.GeometryTypeToName(layer.GetGeomType()) + else " " + ) + return ( + f"{base_command } -lco precision=no -lco DIM=2 -lco GEOMETRY_NAME={BaseVectorFileHandler().default_geometry_column_name}" + + additional_option + ) def promote_to_multi(self, geometry_name): - ''' + """ If needed change the name of the geometry, by promoting it to Multi example if is Point -> MultiPoint Needed for the shapefiles - ''' - if 'Multi' not in geometry_name and 'Point' not in geometry_name: + """ + if "Multi" not in geometry_name and "Point" not in geometry_name: return f"Multi {geometry_name.title()}" return geometry_name diff --git a/importer/handlers/shapefile/serializer.py b/importer/handlers/shapefile/serializer.py index 190b5c89..cbb9920a 100644 --- a/importer/handlers/shapefile/serializer.py +++ b/importer/handlers/shapefile/serializer.py @@ -1,4 +1,3 @@ - from rest_framework import serializers from dynamic_rest.serializers import DynamicModelSerializer from geonode.upload.models import Upload @@ -6,13 +5,19 @@ class ShapeFileSerializer(DynamicModelSerializer): class Meta: - ref_name = 'ShapeFileSerializer' + ref_name = "ShapeFileSerializer" model = Upload view_name = "importer_upload" fields = ( - "base_file", "dbf_file", "shx_file", "prj_file", "xml_file", - "sld_file", "store_spatial_files", "overwrite_existing_layer", - "skip_existing_layers" + "base_file", + "dbf_file", + "shx_file", + "prj_file", + "xml_file", + "sld_file", + "store_spatial_files", + "overwrite_existing_layer", + "skip_existing_layers", ) base_file = serializers.FileField() diff --git a/importer/handlers/shapefile/tests.py b/importer/handlers/shapefile/tests.py index 55b9bd95..6038c59e 100644 --- a/importer/handlers/shapefile/tests.py +++ b/importer/handlers/shapefile/tests.py @@ -1,4 +1,3 @@ - import os import uuid @@ -41,10 +40,10 @@ def test_task_list_is_the_expected_one(self): "start_import", "importer.import_resource", "importer.publish_resource", - "importer.create_geonode_resource" + "importer.create_geonode_resource", ) - self.assertEqual(len(self.handler.ACTIONS['import']), 4) - self.assertTupleEqual(expected, self.handler.ACTIONS['import']) + self.assertEqual(len(self.handler.ACTIONS["import"]), 4) + self.assertTupleEqual(expected, self.handler.ACTIONS["import"]) def test_copy_task_list_is_the_expected_one(self): expected = ( @@ -54,14 +53,18 @@ def test_copy_task_list_is_the_expected_one(self): "importer.publish_resource", "importer.copy_geonode_resource", ) - self.assertEqual(len(self.handler.ACTIONS['copy']), 5) - self.assertTupleEqual(expected, self.handler.ACTIONS['copy']) + self.assertEqual(len(self.handler.ACTIONS["copy"]), 5) + self.assertTupleEqual(expected, self.handler.ACTIONS["copy"]) def test_is_valid_should_raise_exception_if_the_parallelism_is_met(self): - parallelism, created = UploadParallelismLimit.objects.get_or_create(slug="default_max_parallel_uploads") + parallelism, created = UploadParallelismLimit.objects.get_or_create( + slug="default_max_parallel_uploads" + ) old_value = parallelism.max_number try: - UploadParallelismLimit.objects.filter(slug="default_max_parallel_uploads").update(max_number=0) + UploadParallelismLimit.objects.filter( + slug="default_max_parallel_uploads" + ).update(max_number=0) with self.assertRaises(UploadParallelismLimitException): self.handler.is_valid(files=self.valid_shp, user=self.user) @@ -71,18 +74,18 @@ def test_is_valid_should_raise_exception_if_the_parallelism_is_met(self): def test_promote_to_multi(self): # point should be keep as point - actual = self.handler.promote_to_multi('Point') + actual = self.handler.promote_to_multi("Point") self.assertEqual("Point", actual) # polygon should be changed into multipolygon - actual = self.handler.promote_to_multi('Polygon') + actual = self.handler.promote_to_multi("Polygon") self.assertEqual("Multi Polygon", actual) # linestring should be changed into multilinestring - actual = self.handler.promote_to_multi('Linestring') + actual = self.handler.promote_to_multi("Linestring") self.assertEqual("Multi Linestring", actual) # if is already multi should be kept - actual = self.handler.promote_to_multi('Multi Point') + actual = self.handler.promote_to_multi("Multi Point") self.assertEqual("Multi Point", actual) def test_is_valid_should_pass_with_valid_shp(self): @@ -109,8 +112,10 @@ def test_should_NOT_get_the_specific_serializer(self): actual = self.handler.has_serializer(self.invalid_files) self.assertFalse(actual) - @patch('importer.handlers.common.vector.Popen') - def test_import_with_ogr2ogr_without_errors_should_call_the_right_command(self, _open): + @patch("importer.handlers.common.vector.Popen") + def test_import_with_ogr2ogr_without_errors_should_call_the_right_command( + self, _open + ): _uuid = uuid.uuid4() comm = MagicMock() @@ -123,14 +128,17 @@ def test_import_with_ogr2ogr_without_errors_should_call_the_right_command(self, original_name="dataset", handler_module_path=str(self.handler), ovverwrite_layer=False, - alternate="alternate" + alternate="alternate", ) - self.assertEqual('ogr2ogr', _task) + self.assertEqual("ogr2ogr", _task) self.assertEqual(alternate, "alternate") self.assertEqual(str(_uuid), execution_id) _open.assert_called_once() _open.assert_called_with( - f'/usr/bin/ogr2ogr --config PG_USE_COPY YES -f PostgreSQL PG:" dbname=\'geonode_data\' host=localhost port=5434 user=\'geonode\' password=\'geonode\' " "{self.valid_shp.get("base_file")}" -lco DIM=2 -nln alternate "dataset" -lco precision=no -lco GEOMETRY_NAME=geometry ', stdout=-1, stderr=-1, shell=True # noqa + f'/usr/bin/ogr2ogr --config PG_USE_COPY YES -f PostgreSQL PG:" dbname=\'geonode_data\' host=localhost port=5434 user=\'geonode\' password=\'geonode\' " "{self.valid_shp.get("base_file")}" -lco DIM=2 -nln alternate "dataset" -lco precision=no -lco GEOMETRY_NAME=geometry ', + stdout=-1, + stderr=-1, + shell=True, # noqa ) diff --git a/importer/handlers/utils.py b/importer/handlers/utils.py index 9f4fdb7d..034fe074 100644 --- a/importer/handlers/utils.py +++ b/importer/handlers/utils.py @@ -96,9 +96,9 @@ def get_uuid(_list): def evaluate_error(celery_task, exc, task_id, args, kwargs, einfo): - ''' + """ Main error function used by the task for the "on_failure" function - ''' + """ from importer.celery_tasks import orchestrator exec_id = orchestrator.get_execution_object(exec_id=get_uuid(args)) @@ -119,7 +119,7 @@ def evaluate_error(celery_task, exc, task_id, args, kwargs, einfo): output_params.get("errors").append(_log) output_params.get("failed_layers", []).append(args[-1] if args else []) failed = list(set(output_params.get("failed_layers", []))) - output_params['failed_layers'] = failed + output_params["failed_layers"] = failed else: output_params = {"errors": [_log], "failed_layers": [args[-1]]} @@ -129,11 +129,9 @@ def evaluate_error(celery_task, exc, task_id, args, kwargs, einfo): meta={"exec_id": str(exec_id.exec_id), "reason": _log}, ) orchestrator.update_execution_request_status( - execution_id=str(exec_id.exec_id), - output_params=output_params + execution_id=str(exec_id.exec_id), output_params=output_params ) orchestrator.evaluate_execution_progress( - get_uuid(args), - _log=str(exc.detail if hasattr(exc, "detail") else exc.args[0]) + get_uuid(args), _log=str(exc.detail if hasattr(exc, "detail") else exc.args[0]) ) diff --git a/importer/migrations/0001_initial.py b/importer/migrations/0001_initial.py index 3fa86838..d49e37c4 100644 --- a/importer/migrations/0001_initial.py +++ b/importer/migrations/0001_initial.py @@ -5,7 +5,6 @@ class Migration(migrations.Migration): - initial = True dependencies = [ diff --git a/importer/migrations/0002_resourcehandlerinfo_kwargs.py b/importer/migrations/0002_resourcehandlerinfo_kwargs.py index 28743a00..113281f3 100644 --- a/importer/migrations/0002_resourcehandlerinfo_kwargs.py +++ b/importer/migrations/0002_resourcehandlerinfo_kwargs.py @@ -4,15 +4,17 @@ class Migration(migrations.Migration): - dependencies = [ - ('importer', '0001_initial'), + ("importer", "0001_initial"), ] operations = [ migrations.AddField( - model_name='resourcehandlerinfo', - name='kwargs', - field=models.JSONField(default=dict, verbose_name='Storing strictly related information of the handler'), + model_name="resourcehandlerinfo", + name="kwargs", + field=models.JSONField( + default=dict, + verbose_name="Storing strictly related information of the handler", + ), ), ] diff --git a/importer/migrations/0003_resourcehandlerinfo_execution_id.py b/importer/migrations/0003_resourcehandlerinfo_execution_id.py index c0791a3e..1ea22d22 100644 --- a/importer/migrations/0003_resourcehandlerinfo_execution_id.py +++ b/importer/migrations/0003_resourcehandlerinfo_execution_id.py @@ -5,16 +5,20 @@ class Migration(migrations.Migration): - dependencies = [ - ('resource', '0007_alter_executionrequest_action'), - ('importer', '0002_resourcehandlerinfo_kwargs'), + ("resource", "0007_alter_executionrequest_action"), + ("importer", "0002_resourcehandlerinfo_kwargs"), ] operations = [ migrations.AddField( - model_name='resourcehandlerinfo', - name='execution_id', - field=models.ForeignKey(default=None, null=True, on_delete=django.db.models.deletion.SET_NULL, to='resource.executionrequest'), + model_name="resourcehandlerinfo", + name="execution_id", + field=models.ForeignKey( + default=None, + null=True, + on_delete=django.db.models.deletion.SET_NULL, + to="resource.executionrequest", + ), ), ] diff --git a/importer/migrations/0004_rename_execution_id_resourcehandlerinfo_execution_request.py b/importer/migrations/0004_rename_execution_id_resourcehandlerinfo_execution_request.py index 4365d326..7b9a7443 100644 --- a/importer/migrations/0004_rename_execution_id_resourcehandlerinfo_execution_request.py +++ b/importer/migrations/0004_rename_execution_id_resourcehandlerinfo_execution_request.py @@ -4,15 +4,14 @@ class Migration(migrations.Migration): - dependencies = [ - ('importer', '0003_resourcehandlerinfo_execution_id'), + ("importer", "0003_resourcehandlerinfo_execution_id"), ] operations = [ migrations.RenameField( - model_name='resourcehandlerinfo', - old_name='execution_id', - new_name='execution_request', + model_name="resourcehandlerinfo", + old_name="execution_id", + new_name="execution_request", ), ] diff --git a/importer/migrations/0005_fixup_dynamic_shema_table_names.py b/importer/migrations/0005_fixup_dynamic_shema_table_names.py index 0c96882f..e216c8ae 100644 --- a/importer/migrations/0005_fixup_dynamic_shema_table_names.py +++ b/importer/migrations/0005_fixup_dynamic_shema_table_names.py @@ -1,35 +1,34 @@ from django.db import migrations import logging from django.db import ProgrammingError + logger = logging.getLogger(__name__) def fixup_table_name(apps, schema_editor): try: - schema = apps.get_model('dynamic_models', 'ModelSchema') + schema = apps.get_model("dynamic_models", "ModelSchema") for val in schema.objects.all(): if val.name != val.db_table_name: val.db_table_name = val.name val.save() except ProgrammingError as e: - ''' + """ The dynamic model should exists to apply the above migration. In case it does not exists we can skip it - ''' + """ if 'relation "dynamic_models_modelschema" does not exist' in e.args[0]: logging.debug("Dynamic model does not exists yet, skipping") return raise e except Exception as e: raise e - class Migration(migrations.Migration): - dependencies = [ - ('importer', '0004_rename_execution_id_resourcehandlerinfo_execution_request'), - ('dynamic_models', '0005_auto_20220621_0718'), + ("importer", "0004_rename_execution_id_resourcehandlerinfo_execution_request"), + ("dynamic_models", "0005_auto_20220621_0718"), ] operations = [ diff --git a/importer/migrations/0006_dataset_migration.py b/importer/migrations/0006_dataset_migration.py index 651965f3..768d6a49 100644 --- a/importer/migrations/0006_dataset_migration.py +++ b/importer/migrations/0006_dataset_migration.py @@ -4,11 +4,12 @@ from importer.orchestrator import orchestrator from geonode.layers.models import Dataset + def dataset_migration(apps, _): - NewResources = apps.get_model('importer', 'ResourceHandlerInfo') - for old_resource in Dataset.objects\ - .exclude(pk__in=NewResources.objects.values_list('resource_id', flat=True))\ - .exclude(subtype__in=['remote', None]): + NewResources = apps.get_model("importer", "ResourceHandlerInfo") + for old_resource in Dataset.objects.exclude( + pk__in=NewResources.objects.values_list("resource_id", flat=True) + ).exclude(subtype__in=["remote", None]): # generating orchestrator expected data file if not old_resource.files: if old_resource.is_vector(): @@ -29,14 +30,13 @@ def dataset_migration(apps, _): handler_module_path=str(handler_to_use), resource=old_resource, execution_id=None, - kwargs={"is_legacy": True} + kwargs={"is_legacy": True}, ) class Migration(migrations.Migration): - dependencies = [ - ('importer', '0005_fixup_dynamic_shema_table_names'), + ("importer", "0005_fixup_dynamic_shema_table_names"), ] operations = [ diff --git a/importer/models.py b/importer/models.py index e387eb9a..a02c3f25 100644 --- a/importer/models.py +++ b/importer/models.py @@ -35,7 +35,13 @@ class ResourceHandlerInfo(models.Model): Here we save the relation between the geonode resource created and the handler that created that resource """ - resource = models.ForeignKey(ResourceBase, blank=False, null=False, on_delete=models.CASCADE) + resource = models.ForeignKey( + ResourceBase, blank=False, null=False, on_delete=models.CASCADE + ) handler_module_path = models.CharField(max_length=250, blank=False, null=False) - execution_request = models.ForeignKey(ExecutionRequest, null=True, default=None, on_delete=models.SET_NULL) - kwargs = models.JSONField(verbose_name="Storing strictly related information of the handler", default=dict) + execution_request = models.ForeignKey( + ExecutionRequest, null=True, default=None, on_delete=models.SET_NULL + ) + kwargs = models.JSONField( + verbose_name="Storing strictly related information of the handler", default=dict + ) diff --git a/importer/orchestrator.py b/importer/orchestrator.py index a2109355..46eb2ec6 100644 --- a/importer/orchestrator.py +++ b/importer/orchestrator.py @@ -10,8 +10,7 @@ from django.utils import timezone from django.utils.module_loading import import_string from django_celery_results.models import TaskResult -from geonode.base.enumerations import (STATE_INVALID, STATE_PROCESSED, - STATE_RUNNING) +from geonode.base.enumerations import STATE_INVALID, STATE_PROCESSED, STATE_RUNNING from geonode.resource.models import ExecutionRequest from geonode.upload.models import Upload from rest_framework import serializers @@ -113,7 +112,9 @@ def perform_next_step( remaining_tasks = tasks[_index:] if not _index >= len(tasks) else [] if not remaining_tasks: # The list of task is empty, it means that the process is finished - self.evaluate_execution_progress(execution_id, handler_module_path=handler_module_path) + self.evaluate_execution_progress( + execution_id, handler_module_path=handler_module_path + ) return # getting the next step to perform next_step = next(iter(remaining_tasks)) @@ -194,7 +195,9 @@ def set_as_completed(self, execution_id): legacy_status=STATE_PROCESSED, ) - def evaluate_execution_progress(self, execution_id, _log=None, handler_module_path=None): + def evaluate_execution_progress( + self, execution_id, _log=None, handler_module_path=None + ): from importer.models import ResourceHandlerInfo """ @@ -205,7 +208,9 @@ def evaluate_execution_progress(self, execution_id, _log=None, handler_module_pa _exec = self.get_execution_object(execution_id) expected_dataset = _exec.input_params.get("total_layers", 0) - actual_dataset = ResourceHandlerInfo.objects.filter(execution_request=_exec).count() + actual_dataset = ResourceHandlerInfo.objects.filter( + execution_request=_exec + ).count() is_last_dataset = actual_dataset >= expected_dataset execution_id = str(execution_id) # force it as string to be sure lower_exec_id = execution_id.replace("-", "_").lower() @@ -217,7 +222,9 @@ def evaluate_execution_progress(self, execution_id, _log=None, handler_module_pa | Q(task_kwargs__icontains=execution_id) | Q(result__icontains=execution_id) ) - _has_data = ResourceHandlerInfo.objects.filter(execution_request__exec_id=execution_id).exists() + _has_data = ResourceHandlerInfo.objects.filter( + execution_request__exec_id=execution_id + ).exists() # .all() is needed since we want to have the last status on the DB without take in consideration the cache if ( @@ -225,7 +232,9 @@ def evaluate_execution_progress(self, execution_id, _log=None, handler_module_pa .exclude(Q(status=states.SUCCESS) | Q(status=states.FAILURE)) .exists() ): - self._evaluate_last_dataset(is_last_dataset, _log, execution_id, handler_module_path) + self._evaluate_last_dataset( + is_last_dataset, _log, execution_id, handler_module_path + ) elif exec_result.all().filter(status=states.FAILURE).exists(): """ Should set it fail if all the execution are done and at least 1 is failed @@ -233,30 +242,32 @@ def evaluate_execution_progress(self, execution_id, _log=None, handler_module_pa # failed = [x.task_id for x in exec_result.filter(status=states.FAILURE)] # _log_message = f"For the execution ID {execution_id} The following celery task are failed: {failed}" if _has_data: - log = list(set(self.get_execution_object(execution_id).output_params.get("failed_layers", ['Unknown']))) - logger.error(log) - self.set_as_partially_failed( - execution_id=execution_id, reason=log + log = list( + set( + self.get_execution_object(execution_id).output_params.get( + "failed_layers", ["Unknown"] + ) + ) ) + logger.error(log) + self.set_as_partially_failed(execution_id=execution_id, reason=log) self._last_step(execution_id, handler_module_path) elif is_last_dataset: - self.set_as_failed( - execution_id=execution_id, reason=_log - ) + self.set_as_failed(execution_id=execution_id, reason=_log) elif expected_dataset == 1 and not _has_data: - self.set_as_failed( - execution_id=execution_id, reason=_log - ) + self.set_as_failed(execution_id=execution_id, reason=_log) else: - self._evaluate_last_dataset(is_last_dataset, _log, execution_id, handler_module_path) + self._evaluate_last_dataset( + is_last_dataset, _log, execution_id, handler_module_path + ) - def _evaluate_last_dataset(self, is_last_dataset, _log, execution_id, handler_module_path): + def _evaluate_last_dataset( + self, is_last_dataset, _log, execution_id, handler_module_path + ): if is_last_dataset: - if _log and 'ErrorDetail' in _log: - self.set_as_failed( - execution_id=execution_id, reason=_log - ) + if _log and "ErrorDetail" in _log: + self.set_as_failed(execution_id=execution_id, reason=_log) else: logger.info( f"Execution with ID {execution_id} is completed. All tasks are done" @@ -279,7 +290,7 @@ def create_execution_request( legacy_upload_name="", action=None, name=None, - source=None + source=None, ) -> UUID: """ Create an execution request for the user. Return the UUID of the request @@ -292,7 +303,7 @@ def create_execution_request( input_params=input_params, action=action, name=name, - source=source + source=source, ) if self.enable_legacy_upload_status: # getting the package name from the base_filename @@ -341,10 +352,10 @@ def update_execution_request_status( ) def _last_step(self, execution_id, handler_module_path): - ''' + """ Last hookable step for each handler before mark the execution as completed To overwrite this, please hook the method perform_last_step from the Handler - ''' + """ if not handler_module_path: return return self.load_handler(handler_module_path).perform_last_step(execution_id) diff --git a/importer/publisher.py b/importer/publisher.py index 7a103cfc..05da007c 100644 --- a/importer/publisher.py +++ b/importer/publisher.py @@ -50,7 +50,9 @@ def extract_resource_to_publish( def get_resource(self, resource_name) -> bool: self.get_or_create_store() - _res = self.cat.get_resource(resource_name, store=self.store, workspace=self.workspace) + _res = self.cat.get_resource( + resource_name, store=self.store, workspace=self.workspace + ) return True if _res else False def publish_resources(self, resources: List[str]): @@ -65,19 +67,19 @@ def publish_resources(self, resources: List[str]): store=self.store, workspace=self.workspace, ) - + def delete_resource(self, resource_name): layer = self.get_resource(resource_name) if layer: - self.cat.delete(layer.resource, purge='all', recurse=True) - + self.cat.delete(layer.resource, purge="all", recurse=True) + def get_resource(self, dataset_name): return self.cat.get_layer(dataset_name) def overwrite_resources(self, resources: List[str]): - ''' + """ Not available for now, waiting geoserver 2.20/2.21 available with Geonode - ''' + """ pass def get_or_create_store(self): @@ -92,7 +94,9 @@ def get_or_create_store(self): store_name=geodatabase, workspace=self.workspace.name ) - def publish_geoserver_view(self, layer_name, crs, view_name, sql=None, geometry=None): + def publish_geoserver_view( + self, layer_name, crs, view_name, sql=None, geometry=None + ): """ Let the handler create a geoserver view given the input parameters """ @@ -106,5 +110,5 @@ def publish_geoserver_view(self, layer_name, crs, view_name, sql=None, geometry= crs=crs, view_name=view_name, sql=sql, - geometry=geometry + geometry=geometry, ) diff --git a/importer/tests/end2end/test_end2end.py b/importer/tests/end2end/test_end2end.py index 1490382c..231e1c57 100644 --- a/importer/tests/end2end/test_end2end.py +++ b/importer/tests/end2end/test_end2end.py @@ -15,11 +15,11 @@ from importer import project_dir from importer.tests.utils import ImporterBaseTestSupport import gisdata + geourl = settings.GEODATABASE_URL class BaseImporterEndToEndTest(ImporterBaseTestSupport): - @classmethod def setUpClass(cls) -> None: super().setUpClass() @@ -35,15 +35,13 @@ def setUpClass(cls) -> None: } cls.valid_kml = f"{project_dir}/tests/fixture/valid.kml" - cls.url = reverse('importer_upload') - ogc_server_settings = OGC_Servers_Handler(settings.OGC_SERVER)['default'] + cls.url = reverse("importer_upload") + ogc_server_settings = OGC_Servers_Handler(settings.OGC_SERVER)["default"] _user, _password = ogc_server_settings.credentials cls.cat = Catalog( - service_url=ogc_server_settings.rest, - username=_user, - password=_password + service_url=ogc_server_settings.rest, username=_user, password=_password ) def setUp(self) -> None: @@ -61,10 +59,21 @@ def _assertimport(self, payload, initial_name): # if is async, we must wait. It will wait for 1 min before raise exception if ast.literal_eval(os.getenv("ASYNC_SIGNALS", "False")): tentative = 1 - while ExecutionRequest.objects.get(exec_id=response.json().get("execution_id")) != ExecutionRequest.STATUS_FINISHED and tentative <= 6: + while ( + ExecutionRequest.objects.get( + exec_id=response.json().get("execution_id") + ) + != ExecutionRequest.STATUS_FINISHED + and tentative <= 6 + ): time.sleep(10) tentative += 1 - if ExecutionRequest.objects.get(exec_id=response.json().get("execution_id")).status != ExecutionRequest.STATUS_FINISHED: + if ( + ExecutionRequest.objects.get( + exec_id=response.json().get("execution_id") + ).status + != ExecutionRequest.STATUS_FINISHED + ): raise Exception("Async still in progress after 1 min of waiting") # check if the dynamic model is created @@ -87,14 +96,15 @@ def _assertimport(self, payload, initial_name): class ImporterGeoPackageImportTest(BaseImporterEndToEndTest): - @mock.patch.dict(os.environ, {"GEONODE_GEODATABASE": "test_geonode_data"}) - @override_settings(GEODATABASE_URL=f"{geourl.split('/geonode_data')[0]}/test_geonode_data") + @override_settings( + GEODATABASE_URL=f"{geourl.split('/geonode_data')[0]}/test_geonode_data" + ) def test_import_geopackage(self): layer = self.cat.get_layer("geonode:stazioni_metropolitana") self.cat.delete(layer) payload = { - "base_file": open(self.valid_gkpg, 'rb'), + "base_file": open(self.valid_gkpg, "rb"), } initial_name = "stazioni_metropolitana" self._assertimport(payload, initial_name) @@ -103,15 +113,16 @@ def test_import_geopackage(self): class ImporterGeoJsonImportTest(BaseImporterEndToEndTest): - @mock.patch.dict(os.environ, {"GEONODE_GEODATABASE": "test_geonode_data"}) - @override_settings(GEODATABASE_URL=f"{geourl.split('/geonode_data')[0]}/test_geonode_data") + @override_settings( + GEODATABASE_URL=f"{geourl.split('/geonode_data')[0]}/test_geonode_data" + ) def test_import_geojson(self): layer = self.cat.get_layer("geonode:valid") self.cat.delete(layer) payload = { - "base_file": open(self.valid_geojson, 'rb'), + "base_file": open(self.valid_geojson, "rb"), } initial_name = "valid" self._assertimport(payload, initial_name) @@ -120,14 +131,15 @@ def test_import_geojson(self): class ImporterKMLImportTest(BaseImporterEndToEndTest): - @mock.patch.dict(os.environ, {"GEONODE_GEODATABASE": "test_geonode_data"}) - @override_settings(GEODATABASE_URL=f"{geourl.split('/geonode_data')[0]}/test_geonode_data") + @override_settings( + GEODATABASE_URL=f"{geourl.split('/geonode_data')[0]}/test_geonode_data" + ) def test_import_kml(self): layer = self.cat.get_layer("geonode:extruded_polygon") self.cat.delete(layer) payload = { - "base_file": open(self.valid_kml, 'rb'), + "base_file": open(self.valid_kml, "rb"), } initial_name = "extruded_polygon" self._assertimport(payload, initial_name) @@ -136,13 +148,16 @@ def test_import_kml(self): class ImporterShapefileImportTest(BaseImporterEndToEndTest): - @mock.patch.dict(os.environ, {"GEONODE_GEODATABASE": "test_geonode_data"}) - @override_settings(GEODATABASE_URL=f"{geourl.split('/geonode_data')[0]}/test_geonode_data") + @override_settings( + GEODATABASE_URL=f"{geourl.split('/geonode_data')[0]}/test_geonode_data" + ) def test_import_shapefile(self): layer = self.cat.get_layer("geonode:san_andres_y_providencia_highway") self.cat.delete(layer) - payload = {_filename: open(_file, 'rb') for _filename, _file in self.valid_shp.items()} + payload = { + _filename: open(_file, "rb") for _filename, _file in self.valid_shp.items() + } initial_name = "san_andres_y_providencia_highway" self._assertimport(payload, initial_name) layer = self.cat.get_layer("geonode:san_andres_y_providencia_highway") diff --git a/importer/tests/end2end/test_end2end_copy.py b/importer/tests/end2end/test_end2end_copy.py index 1dfb5131..cf4d353d 100644 --- a/importer/tests/end2end/test_end2end_copy.py +++ b/importer/tests/end2end/test_end2end_copy.py @@ -34,8 +34,8 @@ def setUpClass(cls) -> None: "prj_file": f"{file_path}/san_andres_y_providencia_highway.prj", "shx_file": f"{file_path}/san_andres_y_providencia_highway.shx", } - cls.url_create = reverse('importer_upload') - ogc_server_settings = OGC_Servers_Handler(settings.OGC_SERVER)['default'] + cls.url_create = reverse("importer_upload") + ogc_server_settings = OGC_Servers_Handler(settings.OGC_SERVER)["default"] cls.valid_kml = f"{project_dir}/tests/fixture/valid.kml" cls.url_create = reverse("importer_upload") ogc_server_settings = OGC_Servers_Handler(settings.OGC_SERVER)["default"] @@ -67,7 +67,7 @@ def tearDown(self) -> None: def _assertCloning(self, initial_name): # getting the geonode resource - dataset = Dataset.objects.get(alternate__icontains=f'geonode:{initial_name}') + dataset = Dataset.objects.get(alternate__icontains=f"geonode:{initial_name}") prev_dataset_count = Dataset.objects.count() self.client.force_login(get_user_model().objects.get(username="admin")) # creating the url and login @@ -165,11 +165,14 @@ def test_copy_dataset_from_geojson(self): class ImporterCopyEnd2EndShapeFileTest(BaseClassEnd2End): - @mock.patch.dict(os.environ, {"GEONODE_GEODATABASE": "test_geonode_data"}) - @override_settings(GEODATABASE_URL=f"{geourl.split('/geonode_data')[0]}/test_geonode_data") + @override_settings( + GEODATABASE_URL=f"{geourl.split('/geonode_data')[0]}/test_geonode_data" + ) def test_copy_dataset_from_shapefile(self): - payload = {_filename: open(_file, 'rb') for _filename, _file in self.valid_shp.items()} + payload = { + _filename: open(_file, "rb") for _filename, _file in self.valid_shp.items() + } initial_name = "san_andres_y_providencia_highway" # first we need to import a resource with transaction.atomic(): diff --git a/importer/tests/unit/test_models.py b/importer/tests/unit/test_models.py index 7bf1bd85..7ae3e1bf 100644 --- a/importer/tests/unit/test_models.py +++ b/importer/tests/unit/test_models.py @@ -9,7 +9,10 @@ class TestModelSchemaSignal(TestCase): def setUp(self): self.resource = create_single_dataset(name="test_dataset") - ResourceHandlerInfo.objects.create(resource=self.resource, handler_module_path="importer.handlers.shapefile.handler.ShapeFileHandler") + ResourceHandlerInfo.objects.create( + resource=self.resource, + handler_module_path="importer.handlers.shapefile.handler.ShapeFileHandler", + ) self.dynamic_model = ModelSchema.objects.create( name=self.resource.name, db_name="datastore" ) diff --git a/importer/tests/unit/test_task.py b/importer/tests/unit/test_task.py index 332e78af..002041e3 100644 --- a/importer/tests/unit/test_task.py +++ b/importer/tests/unit/test_task.py @@ -13,7 +13,7 @@ import_resource, orchestrator, publish_resource, - rollback + rollback, ) from geonode.resource.models import ExecutionRequest from geonode.layers.models import Dataset @@ -23,13 +23,15 @@ from dynamic_models.models import ModelSchema, FieldSchema from dynamic_models.exceptions import DynamicModelError, InvalidFieldNameError -from importer.tests.utils import ImporterBaseTestSupport, TransactionImporterBaseTestSupport +from importer.tests.utils import ( + ImporterBaseTestSupport, + TransactionImporterBaseTestSupport, +) # Create your tests here. class TestCeleryTasks(ImporterBaseTestSupport): - def setUp(self): self.user = get_user_model().objects.first() self.exec_id = orchestrator.create_execution_request( @@ -167,10 +169,10 @@ def test_publish_resource_if_overwrite_should_call_the_publishing( extract_resource_to_publish, importer, ): - ''' + """ Publish resource should be called since the resource does not exists in geoserver even if an overwrite is required - ''' + """ try: publish_resources.return_value = True extract_resource_to_publish.return_value = [ @@ -219,10 +221,10 @@ def test_publish_resource_if_overwrite_should_not_call_the_publishing( extract_resource_to_publish, importer, ): - ''' + """ Publish resource should be called since the resource does not exists in geoserver even if an overwrite is required - ''' + """ try: get_resource.return_falue = True publish_resources.return_value = True @@ -264,7 +266,6 @@ def test_publish_resource_if_overwrite_should_not_call_the_publishing( @patch("importer.celery_tasks.import_orchestrator.apply_async") def test_create_geonode_resource(self, import_orchestrator): try: - alternate = "geonode:alternate_foo_dataset" self.assertFalse(Dataset.objects.filter(alternate=alternate).exists()) @@ -295,7 +296,6 @@ def test_create_geonode_resource(self, import_orchestrator): def test_copy_geonode_resource_should_raise_exeption_if_the_alternate_not_exists( self, async_call ): - with self.assertRaises(Exception): copy_geonode_resource( str(self.exec_id), @@ -345,20 +345,32 @@ def test_copy_geonode_resource(self, async_call): @patch("importer.handlers.gpkg.handler.GPKGFileHandler._import_resource_rollback") @patch("importer.handlers.gpkg.handler.GPKGFileHandler._publish_resource_rollback") - @patch("importer.handlers.gpkg.handler.GPKGFileHandler._create_geonode_resource_rollback") + @patch( + "importer.handlers.gpkg.handler.GPKGFileHandler._create_geonode_resource_rollback" + ) def test_rollback_works_as_expected_vector_step( self, _create_geonode_resource_rollback, _publish_resource_rollback, - _import_resource_rollback + _import_resource_rollback, ): - ''' + """ rollback should remove the resource based on the step it has reached - ''' + """ test_config = [ ("importer.import_resource", [_import_resource_rollback]), - ("importer.publish_resource", [_import_resource_rollback, _publish_resource_rollback]), - ("importer.create_geonode_resource", [_import_resource_rollback, _publish_resource_rollback, _create_geonode_resource_rollback]) + ( + "importer.publish_resource", + [_import_resource_rollback, _publish_resource_rollback], + ), + ( + "importer.create_geonode_resource", + [ + _import_resource_rollback, + _publish_resource_rollback, + _create_geonode_resource_rollback, + ], + ), ] for conf in test_config: try: @@ -366,12 +378,12 @@ def test_rollback_works_as_expected_vector_step( user=get_user_model().objects.get(username=self.user), func_name="dummy_func", step=conf[0], # step name - action='import', + action="import", input_params={ "files": {"base_file": "/filepath"}, "overwrite_existing_layer": True, "store_spatial_files": True, - "handler_module_path": "importer.handlers.gpkg.handler.GPKGFileHandler" + "handler_module_path": "importer.handlers.gpkg.handler.GPKGFileHandler", }, ) rollback(str(exec_id)) @@ -379,7 +391,7 @@ def test_rollback_works_as_expected_vector_step( # Evaluation req = ExecutionRequest.objects.get(exec_id=str(exec_id)) self.assertEqual("importer.rollback", req.step) - self.assertTrue(req.status=='failed') + self.assertTrue(req.status == "failed") for expected_function in conf[1]: expected_function.assert_called_once() expected_function.reset_mock() @@ -388,23 +400,38 @@ def test_rollback_works_as_expected_vector_step( if exec_id: ExecutionRequest.objects.filter(exec_id=str(exec_id)).delete() - - @patch("importer.handlers.geotiff.handler.GeoTiffFileHandler._import_resource_rollback") - @patch("importer.handlers.geotiff.handler.GeoTiffFileHandler._publish_resource_rollback") - @patch("importer.handlers.geotiff.handler.GeoTiffFileHandler._create_geonode_resource_rollback") + @patch( + "importer.handlers.geotiff.handler.GeoTiffFileHandler._import_resource_rollback" + ) + @patch( + "importer.handlers.geotiff.handler.GeoTiffFileHandler._publish_resource_rollback" + ) + @patch( + "importer.handlers.geotiff.handler.GeoTiffFileHandler._create_geonode_resource_rollback" + ) def test_rollback_works_as_expected_raster( self, _create_geonode_resource_rollback, _publish_resource_rollback, - _import_resource_rollback + _import_resource_rollback, ): - ''' + """ rollback should remove the resource based on the step it has reached - ''' + """ test_config = [ ("importer.import_resource", [_import_resource_rollback]), - ("importer.publish_resource", [_import_resource_rollback, _publish_resource_rollback]), - ("importer.create_geonode_resource", [_import_resource_rollback, _publish_resource_rollback, _create_geonode_resource_rollback]) + ( + "importer.publish_resource", + [_import_resource_rollback, _publish_resource_rollback], + ), + ( + "importer.create_geonode_resource", + [ + _import_resource_rollback, + _publish_resource_rollback, + _create_geonode_resource_rollback, + ], + ), ] for conf in test_config: try: @@ -412,12 +439,12 @@ def test_rollback_works_as_expected_raster( user=get_user_model().objects.get(username=self.user), func_name="dummy_func", step=conf[0], # step name - action='import', + action="import", input_params={ "files": {"base_file": "/filepath"}, "overwrite_existing_layer": True, "store_spatial_files": True, - "handler_module_path": "importer.handlers.geotiff.handler.GeoTiffFileHandler" + "handler_module_path": "importer.handlers.geotiff.handler.GeoTiffFileHandler", }, ) rollback(str(exec_id)) @@ -425,7 +452,7 @@ def test_rollback_works_as_expected_raster( # Evaluation req = ExecutionRequest.objects.get(exec_id=str(exec_id)) self.assertEqual("importer.rollback", req.step) - self.assertTrue(req.status=='failed') + self.assertTrue(req.status == "failed") for expected_function in conf[1]: expected_function.assert_called_once() expected_function.reset_mock() @@ -486,7 +513,10 @@ def test_create_dynamic_structure_should_raise_error_if_field_class_is_none(self layer_name="test_layer", ) - expected_msg = "Error during the field creation. The field or class_name is None {'name': 'field1', 'class_name': None, 'null': True} for test_layer " + f"for execution {name}" + expected_msg = ( + "Error during the field creation. The field or class_name is None {'name': 'field1', 'class_name': None, 'null': True} for test_layer " + + f"for execution {name}" + ) self.assertEqual(expected_msg, str(_exc.exception)) finally: ModelSchema.objects.filter(name=f"schema_{name}").delete() @@ -526,7 +556,9 @@ def test_copy_dynamic_model_should_work(self, async_call): name = str(self.exec_id) # setup model schema to be copied schema = ModelSchema.objects.create( - name=f"schema_{name}", db_name="datastore", db_table_name=f"schema_{name}" + name=f"schema_{name}", + db_name="datastore", + db_table_name=f"schema_{name}", ) FieldSchema.objects.create( name=f"field_{name}", @@ -538,8 +570,10 @@ def test_copy_dynamic_model_should_work(self, async_call): layer.alternate = f"geonode:schema_{name}" layer.save() - self.assertTrue(ModelSchema.objects.filter(name__icontains=f"schema_").count() == 1) - + self.assertTrue( + ModelSchema.objects.filter(name__icontains=f"schema_").count() == 1 + ) + copy_dynamic_model( exec_id=str(self.exec_id), actual_step="copy", @@ -553,11 +587,13 @@ def test_copy_dynamic_model_should_work(self, async_call): ) # the alternate is generated internally self.assertTrue(ModelSchema.objects.filter(name=f"schema_{name}").exists()) - self.assertTrue(ModelSchema.objects.filter(name__icontains=f"schema_").count() == 2) + self.assertTrue( + ModelSchema.objects.filter(name__icontains=f"schema_").count() == 2 + ) schema = ModelSchema.objects.all() for val in schema: - self.assertEqual(val.name , val.db_table_name) + self.assertEqual(val.name, val.db_table_name) async_call.assert_called_once() diff --git a/importer/tests/utils.py b/importer/tests/utils.py index a927f966..1af863e6 100644 --- a/importer/tests/utils.py +++ b/importer/tests/utils.py @@ -3,32 +3,30 @@ class ImporterBaseTestSupport(TestCase): - - databases = ('default', 'datastore') + databases = ("default", "datastore") multi_db = True @classmethod def setUpClass(cls) -> None: super().setUpClass() - ''' + """ Why manually load the fixture after the setupClass? Django in the setUpClass method, load the fixture in all the databases that are defined in the databases attribute. The problem is that the datastore database will contain only the dyanmic models infrastructure and not the whole geonode structure. So that, having the fixture as a attribute will raise and error - ''' + """ fixture = [ - 'initial_data.json', - 'group_test_data.json', - 'default_oauth_apps.json' + "initial_data.json", + "group_test_data.json", + "default_oauth_apps.json", ] - call_command('loaddata', *fixture, **{'verbosity': 0, 'database': "default"}) + call_command("loaddata", *fixture, **{"verbosity": 0, "database": "default"}) class TransactionImporterBaseTestSupport(TransactionTestCase): - databases = ("default", "datastore") multi_db = True diff --git a/importer/utils.py b/importer/utils.py index e4311daa..24b5b4c6 100644 --- a/importer/utils.py +++ b/importer/utils.py @@ -30,9 +30,17 @@ def update(self, uuid, **kwargs) -> ResourceBase: custom_resource_manager = ResourceManager(concrete_manager=ImporterConcreteManager()) -def call_rollback_function(execution_id, handlers_module_path, prev_action, layer=None, alternate=None, error=None, **kwargs): +def call_rollback_function( + execution_id, + handlers_module_path, + prev_action, + layer=None, + alternate=None, + error=None, + **kwargs, +): from importer.celery_tasks import import_orchestrator - + task_params = ( {}, execution_id, @@ -42,19 +50,20 @@ def call_rollback_function(execution_id, handlers_module_path, prev_action, laye alternate, ImporterRequestAction.ROLLBACK.value, ) - kwargs['previous_action'] = prev_action + kwargs["previous_action"] = prev_action kwargs["error"] = error_handler(error, exec_id=execution_id) import_orchestrator.apply_async(task_params, kwargs) def find_key_recursively(obj, key): - ''' + """ Celery (unluckly) append the kwargs for each task under a new kwargs key, so sometimes is faster to look into the key recursively instead of parsing the dict - ''' - if key in obj: return obj.get(key, None) + """ + if key in obj: + return obj.get(key, None) for _, v in obj.items(): if isinstance(v, dict): return find_key_recursively(v, key)