Skip to content

Commit

Permalink
Black reformatting
Browse files Browse the repository at this point in the history
  • Loading branch information
giohappy committed Jul 6, 2023
1 parent 70fab7c commit 0f464df
Show file tree
Hide file tree
Showing 40 changed files with 908 additions and 529 deletions.
1 change: 0 additions & 1 deletion importer/api/serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

class ImporterSerializer(DynamicModelSerializer):
class Meta:

ref_name = "ImporterSerializer"
model = Upload
view_name = "importer_upload"
Expand Down
20 changes: 9 additions & 11 deletions importer/api/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@


class TestImporterViewSet(ImporterBaseTestSupport):

@classmethod
def setUpClass(cls):
super().setUpClass()
Expand Down Expand Up @@ -45,7 +44,7 @@ def test_redirect_to_old_upload_if_file_is_not_a_gpkg(self, patch_upload):
upload.upload.return_value = HttpResponse()
patch_upload.return_value = upload

self.client.force_login(get_user_model().objects.get(username='admin'))
self.client.force_login(get_user_model().objects.get(username="admin"))
payload = {
"base_file": SimpleUploadedFile(name="file.invalid", content=b"abc"),
}
Expand All @@ -59,7 +58,7 @@ def test_gpkg_raise_error_with_invalid_payload(self, patch_upload):
upload.upload.return_value = HttpResponse()
patch_upload.return_value = upload

self.client.force_login(get_user_model().objects.get(username='admin'))
self.client.force_login(get_user_model().objects.get(username="admin"))
payload = {
"base_file": SimpleUploadedFile(name="test.gpkg", content=b"some-content"),
"store_spatial_files": "invalid",
Expand All @@ -79,7 +78,7 @@ def test_gpkg_raise_error_with_invalid_payload(self, patch_upload):
def test_gpkg_task_is_called(self, patch_upload):
patch_upload.apply_async.side_effect = MagicMock()

self.client.force_login(get_user_model().objects.get(username='admin'))
self.client.force_login(get_user_model().objects.get(username="admin"))
payload = {
"base_file": SimpleUploadedFile(name="test.gpkg", content=b"some-content"),
"store_spatial_files": True,
Expand All @@ -93,7 +92,7 @@ def test_gpkg_task_is_called(self, patch_upload):
def test_geojson_task_is_called(self, patch_upload):
patch_upload.apply_async.side_effect = MagicMock()

self.client.force_login(get_user_model().objects.get(username='admin'))
self.client.force_login(get_user_model().objects.get(username="admin"))
payload = {
"base_file": SimpleUploadedFile(
name="test.geojson", content=b"some-content"
Expand All @@ -111,7 +110,7 @@ def test_geojson_task_is_called(self, patch_upload):
def test_zip_file_is_unzip_and_the_handler_is_found(self, patch_upload):
patch_upload.apply_async.side_effect = MagicMock()

self.client.force_login(get_user_model().objects.get(username='admin'))
self.client.force_login(get_user_model().objects.get(username="admin"))
payload = {
"base_file": open(f"{project_dir}/tests/fixture/valid.zip", "rb"),
"zip_file": open(f"{project_dir}/tests/fixture/valid.zip", "rb"),
Expand All @@ -123,8 +122,7 @@ def test_zip_file_is_unzip_and_the_handler_is_found(self, patch_upload):
self.assertEqual(201, response.status_code)

def test_copy_method_not_allowed(self):

self.client.force_login(get_user_model().objects.get(username='admin'))
self.client.force_login(get_user_model().objects.get(username="admin"))

response = self.client.get(self.copy_url)
self.assertEqual(405, response.status_code)
Expand All @@ -139,7 +137,7 @@ def test_copy_method_not_allowed(self):
@patch("importer.api.views.ResourceBaseViewSet.resource_service_copy")
def test_redirect_to_old_upload_if_file_handler_is_not_set(self, copy_view, _orc):
copy_view.return_value = HttpResponse()
self.client.force_login(get_user_model().objects.get(username='admin'))
self.client.force_login(get_user_model().objects.get(username="admin"))

response = self.client.put(self.copy_url)

Expand All @@ -149,10 +147,10 @@ def test_redirect_to_old_upload_if_file_handler_is_not_set(self, copy_view, _orc

@patch("importer.api.views.import_orchestrator")
def test_copy_ther_resource_if_file_handler_is_set(self, _orc):
user = get_user_model().objects.get(username='admin')
user = get_user_model().objects.get(username="admin")
user.is_superuser = True
user.save()
self.client.force_login(get_user_model().objects.get(username='admin'))
self.client.force_login(get_user_model().objects.get(username="admin"))
ResourceHandlerInfo.objects.create(
resource=self.dataset,
handler_module_path="importer.handlers.gpkg.handler.GPKGFileHandler",
Expand Down
8 changes: 2 additions & 6 deletions importer/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ def get_serializer_class(self):
return specific_serializer or ImporterSerializer

def create(self, request, *args, **kwargs):

"""
Main function called by the new import flow.
It received the file via the front end
Expand Down Expand Up @@ -120,7 +119,6 @@ def create(self, request, *args, **kwargs):
handler = orchestrator.get_handler(_data)

if _file and handler:

try:
# cloning data into a local folder
extracted_params, _data = handler.extract_params_from_data(_data)
Expand Down Expand Up @@ -151,7 +149,7 @@ def create(self, request, *args, **kwargs):
legacy_upload_name=_file.name,
action=action,
name=_file.name,
source='upload'
source="upload",
)

sig = import_orchestrator.s(
Expand All @@ -175,7 +173,6 @@ def create(self, request, *args, **kwargs):


class ResourceImporter(DynamicModelViewSet):

authentication_classes = [
SessionAuthentication,
BasicAuthentication,
Expand Down Expand Up @@ -212,7 +209,6 @@ class ResourceImporter(DynamicModelViewSet):
def copy(self, request, *args, **kwargs):
resource = self.get_object()
if resource.resourcehandlerinfo_set.exists():

handler_module_path = (
resource.resourcehandlerinfo_set.first().handler_module_path
)
Expand Down Expand Up @@ -241,7 +237,7 @@ def copy(self, request, *args, **kwargs):
**{"handler_module_path": handler_module_path},
**extracted_params,
},
source="importer_copy"
source="importer_copy",
)

sig = import_orchestrator.s(
Expand Down
83 changes: 52 additions & 31 deletions importer/celery_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@
from importer.celery_app import importer_app
from importer.datastore import DataStoreManager
from importer.handlers.gpkg.tasks import SingleMessageErrorHandler
from importer.handlers.utils import create_alternate, drop_dynamic_model_schema, evaluate_error, get_uuid
from importer.handlers.utils import (
create_alternate,
drop_dynamic_model_schema,
evaluate_error,
get_uuid,
)
from importer.orchestrator import orchestrator
from importer.publisher import DataPublisher
from importer.settings import (
Expand All @@ -39,6 +44,7 @@ class ErrorBaseTaskClass(Task):
Basic Error task class. Is common to all the base tasks of the import pahse
it defines a on_failure method which set the task as "failed" with some extra information
"""

max_retries = 3
track_started = True

Expand Down Expand Up @@ -69,7 +75,6 @@ def import_orchestrator(
action=exa.IMPORT.value,
**kwargs,
):

"""
Base task. Is the task responsible to call the orchestrator and redirect the upload to the next step
mainly is a wrapper for the Orchestrator object.
Expand Down Expand Up @@ -102,7 +107,7 @@ def import_orchestrator(

@importer_app.task(
bind=True,
#base=ErrorBaseTaskClass,
# base=ErrorBaseTaskClass,
name="importer.import_resource",
queue="importer.import_resource",
max_retries=1,
Expand Down Expand Up @@ -162,7 +167,7 @@ def import_resource(self, execution_id, /, handler_module_path, action, **kwargs
layer=None,
alternate=None,
error=e,
**kwargs
**kwargs,
)
raise InvalidInputFileException(detail=error_handler(e, execution_id))

Expand Down Expand Up @@ -225,7 +230,9 @@ def publish_resource(
)
if data:
# we should not publish resource without a crs
if not _overwrite or (_overwrite and not _publisher.get_resource(alternate)):
if not _overwrite or (
_overwrite and not _publisher.get_resource(alternate)
):
_publisher.publish_resources(data)
else:
_publisher.overwrite_resources(data)
Expand All @@ -237,7 +244,9 @@ def publish_resource(
celery_task_request=self.request,
)
else:
logger.error(f"Layer: {alternate} raised: Only resources with a CRS provided can be published for execution_id: {execution_id}")
logger.error(
f"Layer: {alternate} raised: Only resources with a CRS provided can be published for execution_id: {execution_id}"
)
raise PublishResourceException(
"Only resources with a CRS provided can be published"
)
Expand Down Expand Up @@ -268,7 +277,7 @@ def publish_resource(
layer=layer_name,
alternate=alternate,
error=e,
**kwargs
**kwargs,
)
raise PublishResourceException(detail=error_handler(e, execution_id))

Expand Down Expand Up @@ -326,17 +335,27 @@ def create_geonode_resource(

if _overwrite:
resource = handler.overwrite_geonode_resource(
layer_name=layer_name, alternate=alternate, execution_id=execution_id, files=_files
layer_name=layer_name,
alternate=alternate,
execution_id=execution_id,
files=_files,
)
else:
resource = handler.create_geonode_resource(
layer_name=layer_name, alternate=alternate, execution_id=execution_id, files=_files
layer_name=layer_name,
alternate=alternate,
execution_id=execution_id,
files=_files,
)

if _overwrite:
handler.overwrite_resourcehandlerinfo(handler_module_path, resource, _exec, **kwargs)
handler.overwrite_resourcehandlerinfo(
handler_module_path, resource, _exec, **kwargs
)
else:
handler.create_resourcehandlerinfo(handler_module_path, resource, _exec, **kwargs)
handler.create_resourcehandlerinfo(
handler_module_path, resource, _exec, **kwargs
)

# at the end recall the import_orchestrator for the next step
import_orchestrator.apply_async(
Expand All @@ -360,7 +379,7 @@ def create_geonode_resource(
layer=layer_name,
alternate=alternate,
error=e,
**kwargs
**kwargs,
)
raise ResourceCreationException(detail=error_handler(e))

Expand Down Expand Up @@ -417,10 +436,14 @@ def copy_geonode_resource(
_exec=_exec,
data_to_update=data_to_update,
new_alternate=new_alternate,
**kwargs
**kwargs,
)

handler.create_resourcehandlerinfo(resource=new_resource, handler_module_path=handler_module_path, execution_id=_exec)
handler.create_resourcehandlerinfo(
resource=new_resource,
handler_module_path=handler_module_path,
execution_id=_exec,
)

assert f"{workspace}:{new_alternate}" == new_resource.alternate

Expand Down Expand Up @@ -453,7 +476,7 @@ def copy_geonode_resource(
layer=layer_name,
alternate=alternate,
error=e,
**kwargs
**kwargs,
)
raise CopyResourceException(detail=e)
return exec_id, new_alternate
Expand Down Expand Up @@ -510,9 +533,9 @@ def _create_field(dynamic_model_schema, field, _kwargs):
if field["class_name"].endswith("CharField"):
_kwargs = {**_kwargs, **{"max_length": 255}}

if field.get('dim', None) is not None:
if field.get("dim", None) is not None:
# setting the dimension for the gemetry. So that we can handle also 3d geometries
_kwargs = {**_kwargs, **{"dim": field.get('dim')}}
_kwargs = {**_kwargs, **{"dim": field.get("dim")}}

# if is a new creation we generate the field model from scratch
if not overwrite:
Expand Down Expand Up @@ -584,7 +607,7 @@ def copy_dynamic_model(
# Creating the dynamic schema object
new_schema = dynamic_schema.first()
new_schema.name = new_dataset_alternate
new_schema.db_table_name = new_dataset_alternate
new_schema.db_table_name = new_dataset_alternate
new_schema.pk = None
new_schema.save()
# create the field_schema object
Expand Down Expand Up @@ -622,7 +645,7 @@ def copy_dynamic_model(
layer=layer_name,
alternate=alternate,
error=e,
**{**kwargs, **additional_kwargs}
**{**kwargs, **additional_kwargs},
)
raise CopyResourceException(detail=e)
return exec_id, kwargs
Expand All @@ -639,9 +662,8 @@ def copy_geonode_data_table(
):
"""
Once the base resource is copied, is time to copy also the dynamic model
"""
"""
try:

orchestrator.update_execution_request_status(
execution_id=exec_id,
last_updated=timezone.now(),
Expand All @@ -659,7 +681,9 @@ def copy_geonode_data_table(

db_name = os.getenv("DEFAULT_BACKEND_DATASTORE", "datastore")
if os.getenv("IMPORTER_ENABLE_DYN_MODELS", False):
schema_exists = ModelSchema.objects.filter(name=new_dataset_alternate).first()
schema_exists = ModelSchema.objects.filter(
name=new_dataset_alternate
).first()
if schema_exists:
db_name = schema_exists.db_name

Expand Down Expand Up @@ -691,7 +715,7 @@ def copy_geonode_data_table(
layer=layer_name,
alternate=alternate,
error=e,
**kwargs
**kwargs,
)
raise CopyResourceException(detail=e)
return exec_id, kwargs
Expand All @@ -710,7 +734,7 @@ def rollback(self, *args, **kwargs):
The handler must implement the code to rollback each step that
is declared
"""

exec_id = get_uuid(args)

logger.info(f"Calling rollback for execution_id {exec_id} in progress")
Expand All @@ -729,14 +753,11 @@ def rollback(self, *args, **kwargs):
)

handler = import_string(handler_module_path)()
handler.rollback(
exec_id,
rollback_from_step,
action_to_rollback,
*args,
**kwargs
handler.rollback(exec_id, rollback_from_step, action_to_rollback, *args, **kwargs)
error = (
find_key_recursively(kwargs, "error")
or "Some issue has occured, please check the logs"
)
error = find_key_recursively(kwargs, "error") or "Some issue has occured, please check the logs"
orchestrator.set_as_failed(exec_id, reason=error)
return exec_id, kwargs

Expand Down
3 changes: 1 addition & 2 deletions importer/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,4 @@ def start_import(self, execution_id, **kwargs):
"""
call the resource handler object to perform the import phase
"""
return self.handler().import_resource(self.files, execution_id, **kwargs)

return self.handler().import_resource(self.files, execution_id, **kwargs)
Loading

0 comments on commit 0f464df

Please sign in to comment.