From bf1fd1a27c13efeb6af11a6d857d49b15ff4aa4d Mon Sep 17 00:00:00 2001 From: Chatewgne Date: Thu, 21 Nov 2024 12:08:11 +0100 Subject: [PATCH] =?UTF-8?q?=E2=99=BB=20[REF]=20Move=20GPX/KML=20handling?= =?UTF-8?q?=20methods=20to=20Parsers=20utils=20(refs=20#3947)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- geotrek/cirkwi/parsers.py | 4 +- geotrek/common/tests/test_utils.py | 152 +++++++++- geotrek/common/utils/parsers.py | 121 ++++++++ geotrek/trekking/parsers.py | 138 +-------- .../a_trek_with_bad_geom.json | 277 ++++++++++++++++++ geotrek/trekking/tests/test_parsers.py | 154 +--------- 6 files changed, 576 insertions(+), 270 deletions(-) create mode 100644 geotrek/trekking/tests/data/apidae_trek_parser/a_trek_with_bad_geom.json diff --git a/geotrek/cirkwi/parsers.py b/geotrek/cirkwi/parsers.py index 8fb1fb311c..3a4d1c9c48 100644 --- a/geotrek/cirkwi/parsers.py +++ b/geotrek/cirkwi/parsers.py @@ -5,12 +5,12 @@ from django.contrib.gis.geos import Point, MultiPoint, GEOSGeometry from django.utils.translation import gettext as _ +from geotrek.common.utils.parsers import get_geom_from_gpx from geotrek.trekking.models import DifficultyLevel from geotrek.cirkwi.models import CirkwiLocomotion from geotrek.common.parsers import AttachmentParserMixin, GlobalImportError, Parser, RowImportError from geotrek.tourism.models import TouristicContent, TouristicContentType1 from geotrek.trekking.models import Trek, Practice -from geotrek.trekking.parsers import ApidaeTrekParser class CirkwiParser(AttachmentParserMixin, Parser): @@ -154,7 +154,7 @@ class CirkwiTrekParser(CirkwiParser): def filter_geom(self, src, val): response = self.request_or_retry(url=val) - return ApidaeTrekParser._get_geom_from_gpx(response.content) + return get_geom_from_gpx(response.content) def filter_practice(self, src, val): """ diff --git a/geotrek/common/tests/test_utils.py b/geotrek/common/tests/test_utils.py index d2250c0532..e440a04fca 100644 --- a/geotrek/common/tests/test_utils.py +++ b/geotrek/common/tests/test_utils.py @@ -1,13 +1,18 @@ import os +from shutil import copy as copyfile from django.conf import settings from django.contrib.gis.geos import Point -from django.test import SimpleTestCase, TestCase, override_settings +from django.test import SimpleTestCase, TestCase +from django.test.utils import override_settings from ..parsers import Parser -from ..utils import uniquify, format_coordinates, spatial_reference, simplify_coords +from ..utils import (format_coordinates, simplify_coords, spatial_reference, + uniquify) +from ..utils.file_infos import get_encoding_file from ..utils.import_celery import create_tmp_destination, subclasses -from ..utils.parsers import add_http_prefix +from ..utils.parsers import (add_http_prefix, get_geom_from_gpx, + get_geom_from_kml, maybe_fix_encoding_to_utf8) class UtilsTest(TestCase): @@ -100,3 +105,144 @@ def test_add_http_prefix_without_prefix(self): def test_add_http_prefix_with_prefix(self): self.assertEqual('http://test.com', add_http_prefix('http://test.com')) + + +class GpxToGeomTests(SimpleTestCase): + + @staticmethod + def _get_gpx_from(filename): + with open(filename, 'r') as f: + gpx = f.read() + return bytes(gpx, 'utf-8') + + def test_gpx_with_waypoint_can_be_converted(self): + gpx = self._get_gpx_from('geotrek/trekking/tests/data/apidae_trek_parser/apidae_test_trek.gpx') + + geom = get_geom_from_gpx(gpx) + + self.assertEqual(geom.srid, 2154) + self.assertEqual(geom.geom_type, 'LineString') + self.assertEqual(len(geom.coords), 13) + first_point = geom.coords[0] + self.assertAlmostEqual(first_point[0], 977776.9, delta=0.1) + self.assertAlmostEqual(first_point[1], 6547354.8, delta=0.1) + + def test_gpx_with_route_points_can_be_converted(self): + gpx = self._get_gpx_from('geotrek/trekking/tests/data/apidae_trek_parser/trace_with_route_points.gpx') + + geom = get_geom_from_gpx(gpx) + + self.assertEqual(geom.srid, 2154) + self.assertEqual(geom.geom_type, 'LineString') + self.assertEqual(len(geom.coords), 13) + first_point = geom.coords[0] + self.assertAlmostEqual(first_point[0], 977776.9, delta=0.1) + self.assertAlmostEqual(first_point[1], 6547354.8, delta=0.1) + + def test_it_raises_an_error_on_not_continuous_segments(self): + gpx = self._get_gpx_from('geotrek/trekking/tests/data/apidae_trek_parser/trace_with_not_continuous_segments.gpx') + + with self.assertRaises(ValueError): + get_geom_from_gpx(gpx) + + def test_it_handles_segment_with_single_point(self): + gpx = self._get_gpx_from( + 'geotrek/trekking/tests/data/apidae_trek_parser/trace_with_single_point_segment.gpx' + ) + geom = get_geom_from_gpx(gpx) + + self.assertEqual(geom.srid, 2154) + self.assertEqual(geom.geom_type, 'LineString') + self.assertEqual(len(geom.coords), 13) + + def test_it_raises_an_error_when_no_linestring(self): + gpx = self._get_gpx_from('geotrek/trekking/tests/data/apidae_trek_parser/trace_with_no_feature.gpx') + + with self.assertRaises(ValueError): + get_geom_from_gpx(gpx) + + def test_it_handles_multiple_continuous_features(self): + gpx = self._get_gpx_from('geotrek/trekking/tests/data/apidae_trek_parser/trace_with_multiple_continuous_features.gpx') + geom = get_geom_from_gpx(gpx) + + self.assertEqual(geom.srid, 2154) + self.assertEqual(geom.geom_type, 'LineString') + self.assertEqual(len(geom.coords), 12) + first_point = geom.coords[0] + self.assertAlmostEqual(first_point[0], 977776.9, delta=0.1) + self.assertAlmostEqual(first_point[1], 6547354.8, delta=0.1) + + def test_it_handles_multiple_continuous_features_with_one_empty(self): + gpx = self._get_gpx_from('geotrek/trekking/tests/data/apidae_trek_parser/trace_with_multiple_continuous_features_and_one_empty.gpx') + geom = get_geom_from_gpx(gpx) + + self.assertEqual(geom.srid, 2154) + self.assertEqual(geom.geom_type, 'LineString') + self.assertEqual(len(geom.coords), 12) + first_point = geom.coords[0] + self.assertAlmostEqual(first_point[0], 977776.9, delta=0.1) + self.assertAlmostEqual(first_point[1], 6547354.8, delta=0.1) + + def test_it_raises_error_on_multiple_not_continuous_features(self): + gpx = self._get_gpx_from('geotrek/trekking/tests/data/apidae_trek_parser/trace_with_multiple_not_continuous_features.gpx') + with self.assertRaises(ValueError): + get_geom_from_gpx(gpx) + + +class KmlToGeomTests(SimpleTestCase): + + @staticmethod + def _get_kml_from(filename): + with open(filename, 'r') as f: + kml = f.read() + return bytes(kml, 'utf-8') + + def test_kml_can_be_converted(self): + kml = self._get_kml_from('geotrek/trekking/tests/data/apidae_trek_parser/trace.kml') + + geom = get_geom_from_kml(kml) + + self.assertEqual(geom.srid, 2154) + self.assertEqual(geom.geom_type, 'LineString') + self.assertEqual(len(geom.coords), 61) + first_point = geom.coords[0] + self.assertAlmostEqual(first_point[0], 973160.8, delta=0.1) + self.assertAlmostEqual(first_point[1], 6529320.1, delta=0.1) + + def test_it_raises_exception_when_no_linear_data(self): + kml = self._get_kml_from('geotrek/trekking/tests/data/apidae_trek_parser/trace_with_no_line.kml') + + with self.assertRaises(ValueError): + get_geom_from_kml(kml) + + +class TestConvertEncodingFiles(TestCase): + data_dir = "geotrek/trekking/tests/data" + + def setUp(self): + if not os.path.exists(settings.TMP_DIR): + os.mkdir(settings.TMP_DIR) + + def test_fix_encoding_to_utf8(self): + file_name = f'{settings.TMP_DIR}/file_bad_encoding_tmp.kml' + copyfile(f'{self.data_dir}/file_bad_encoding.kml', file_name) + + encoding = get_encoding_file(file_name) + self.assertNotEqual(encoding, "utf-8") + + new_file_name = maybe_fix_encoding_to_utf8(file_name) + + encoding = get_encoding_file(new_file_name) + self.assertEqual(encoding, "utf-8") + + def test_not_fix_encoding_to_utf8(self): + file_name = f'{settings.TMP_DIR}/file_good_encoding_tmp.kml' + copyfile(f'{self.data_dir}/file_good_encoding.kml', file_name) + + encoding = get_encoding_file(file_name) + self.assertEqual(encoding, "utf-8") + + new_file_name = maybe_fix_encoding_to_utf8(file_name) + + encoding = get_encoding_file(new_file_name) + self.assertEqual(encoding, "utf-8") diff --git a/geotrek/common/utils/parsers.py b/geotrek/common/utils/parsers.py index 319ff6aaa9..246a22b355 100644 --- a/geotrek/common/utils/parsers.py +++ b/geotrek/common/utils/parsers.py @@ -1,5 +1,126 @@ +import codecs +import os +from datetime import datetime +from tempfile import NamedTemporaryFile + +from django.conf import settings +from django.contrib.gis.gdal import DataSource +from django.contrib.gis.geos import MultiLineString +from django.utils.translation import gettext as _ + +from geotrek.common.utils.file_infos import get_encoding_file + + def add_http_prefix(url): if url.startswith('http'): return url else: return 'http://' + url + + +def maybe_fix_encoding_to_utf8(file_name): + encoding = get_encoding_file(file_name) + + # If not utf-8, convert file to utf-8 + if encoding != "utf-8": + tmp_file_path = os.path.join(settings.TMP_DIR, 'fileNameTmp_' + str(datetime.now().timestamp())) + BLOCKSIZE = 9_048_576 + with codecs.open(file_name, "r", encoding) as sourceFile: + with codecs.open(tmp_file_path, "w", "utf-8") as targetFile: + while True: + contents = sourceFile.read(BLOCKSIZE) + if not contents: + break + targetFile.write(contents) + os.replace(tmp_file_path, file_name) + return file_name + + +def get_geom_from_gpx(data): + def convert_to_geos(geom): + # FIXME: is it right to try to correct input geometries? + # FIXME: how to log that info/spread errors? + if geom.geom_type == 'MultiLineString' and any([ls for ls in geom if ls.num_points == 1]): + # Handles that framework conversion fails when there are LineStrings of length 1 + geos_mls = MultiLineString([ls.geos for ls in geom if ls.num_points > 1]) + geos_mls.srid = geom.srid + return geos_mls + + return geom.geos + + def get_layer(datasource, layer_name): + for layer in datasource: + if layer.name == layer_name: + return layer + + def maybe_get_linestring_from_layer(layer): + if layer.num_feat == 0: + return None + geoms = [] + for feat in layer: + if feat.geom.num_coords == 0: + continue + geos = convert_to_geos(feat.geom) + if geos.geom_type == 'MultiLineString': + geos = geos.merged # If possible we merge the MultiLineString into a LineString + if geos.geom_type == 'MultiLineString': + raise ValueError( + _("Feature geometry cannot be converted to a single continuous LineString feature")) + geoms.append(geos) + + full_geom = MultiLineString(geoms) + full_geom.srid = geoms[0].srid + full_geom = full_geom.merged # If possible we merge the MultiLineString into a LineString + if full_geom.geom_type == 'MultiLineString': + raise ValueError( + _("Geometries from various features cannot be converted to a single continuous LineString feature")) + + return full_geom + + """Given GPX data as bytes it returns a geom.""" + # FIXME: is there another way than the temporary file? It seems not. `DataSource` really expects a filename. + with NamedTemporaryFile(mode='w+b', dir=settings.TMP_DIR) as ntf: + ntf.write(data) + ntf.flush() + + file_path = maybe_fix_encoding_to_utf8(ntf.name) + ds = DataSource(file_path) + for layer_name in ('tracks', 'routes'): + layer = get_layer(ds, layer_name) + geos = maybe_get_linestring_from_layer(layer) + if geos: + break + else: + raise ValueError("No LineString feature found in GPX layers tracks or routes") + geos.transform(settings.SRID) + return geos + + +def get_geom_from_kml(data): + """Given KML data as bytes it returns a geom.""" + + def get_geos_linestring(datasource): + layer = datasource[0] + geom = get_first_geom_with_type_in(types=['MultiLineString', 'LineString'], geoms=layer.get_geoms()) + geom.coord_dim = 2 + geos = geom.geos + if geos.geom_type == 'MultiLineString': + geos = geos.merged + return geos + + def get_first_geom_with_type_in(types, geoms): + for g in geoms: + for t in types: + if g.geom_type.name.startswith(t): + return g + raise ValueError('The attached KML geometry does not have any LineString or MultiLineString data') + + with NamedTemporaryFile(mode='w+b', dir=settings.TMP_DIR) as ntf: + ntf.write(data) + ntf.flush() + + file_path = maybe_fix_encoding_to_utf8(ntf.name) + ds = DataSource(file_path) + geos = get_geos_linestring(ds) + geos.transform(settings.SRID) + return geos diff --git a/geotrek/trekking/parsers.py b/geotrek/trekking/parsers.py index 92825d2101..123e37b76c 100644 --- a/geotrek/trekking/parsers.py +++ b/geotrek/trekking/parsers.py @@ -6,27 +6,27 @@ import zipfile import textwrap from collections import defaultdict -from datetime import date, timedelta, datetime +from datetime import date, timedelta from decimal import Decimal -from tempfile import NamedTemporaryFile -import codecs + import os from urllib.parse import urlparse from django.conf import settings -from django.contrib.gis.gdal import DataSource -from django.contrib.gis.geos import GEOSGeometry, MultiLineString, Point, LineString + +from django.contrib.gis.geos import GEOSGeometry, Point, LineString from django.utils.translation import get_language from django.utils.translation import gettext as _ from paperclip.models import attachment_upload, random_suffix_regexp from modeltranslation.utils import build_localized_fieldname -from geotrek.common.utils.file_infos import get_encoding_file + from geotrek.common.models import Label, Theme, License, Attachment from geotrek.common.parsers import (ApidaeBaseParser, AttachmentParserMixin, GeotrekParser, GlobalImportError, Parser, RowImportError, ShapeParser, DownloadImportError, ValueImportError) +from geotrek.common.utils.parsers import get_geom_from_gpx, get_geom_from_kml from geotrek.core.models import Path, Topology from geotrek.trekking.models import (POI, Accessibility, DifficultyLevel, OrderedTrekChild, Service, Trek, @@ -690,13 +690,16 @@ def filter_geom(self, src, val): geom_file = self._fetch_geometry_file(plan) ext = self._get_plan_extension(plan) - if ext == 'gpx': - return ApidaeTrekParser._get_geom_from_gpx(geom_file) - elif ext == 'kml': - return ApidaeTrekParser._get_geom_from_kml(geom_file) - elif ext == 'kmz': - kml_file = zipfile.ZipFile(io.BytesIO(geom_file)).read('doc.kml') - return ApidaeTrekParser._get_geom_from_kml(kml_file) + try: + if ext == 'gpx': + return get_geom_from_gpx(geom_file) + elif ext == 'kml': + return get_geom_from_kml(geom_file) + elif ext == 'kmz': + kml_file = zipfile.ZipFile(io.BytesIO(geom_file)).read('doc.kml') + return get_geom_from_kml(kml_file) + except ValueError as e: + raise RowImportError(str(e)) def filter_labels(self, src, val): typologies, environnements = val @@ -964,115 +967,6 @@ def _get_plan_extension(plan): return url_suffix.split('.')[1] return None - @staticmethod - def _get_geom_from_gpx(data): - """Given GPX data as bytes it returns a geom.""" - # FIXME: is there another way than the temporary file? It seems not. `DataSource` really expects a filename. - with NamedTemporaryFile(mode='w+b', dir=settings.TMP_DIR) as ntf: - ntf.write(data) - ntf.flush() - - file_path = ApidaeTrekParser._maybe_fix_encoding_to_utf8(ntf.name) - ds = DataSource(file_path) - for layer_name in ('tracks', 'routes'): - layer = ApidaeTrekParser._get_layer(ds, layer_name) - geos = ApidaeTrekParser._maybe_get_linestring_from_layer(layer) - if geos: - break - else: - raise RowImportError("No LineString feature found in GPX layers tracks or routes") - geos.transform(settings.SRID) - return geos - - @staticmethod - def _maybe_fix_encoding_to_utf8(file_name): - encoding = get_encoding_file(file_name) - - # If not utf-8, convert file to utf-8 - if encoding != "utf-8": - tmp_file_path = os.path.join(settings.TMP_DIR, 'fileNameTmp_' + str(datetime.now().timestamp())) - BLOCKSIZE = 9_048_576 - with codecs.open(file_name, "r", encoding) as sourceFile: - with codecs.open(tmp_file_path, "w", "utf-8") as targetFile: - while True: - contents = sourceFile.read(BLOCKSIZE) - if not contents: - break - targetFile.write(contents) - os.replace(tmp_file_path, file_name) - return file_name - - @staticmethod - def _get_geom_from_kml(data): - """Given KML data as bytes it returns a geom.""" - - def get_geos_linestring(datasource): - layer = datasource[0] - geom = get_first_geom_with_type_in(types=['MultiLineString', 'LineString'], geoms=layer.get_geoms()) - geom.coord_dim = 2 - geos = geom.geos - if geos.geom_type == 'MultiLineString': - geos = geos.merged - return geos - - def get_first_geom_with_type_in(types, geoms): - for g in geoms: - for t in types: - if g.geom_type.name.startswith(t): - return g - raise RowImportError('The attached KML geometry does not have any LineString or MultiLineString data') - - with NamedTemporaryFile(mode='w+b', dir=settings.TMP_DIR) as ntf: - ntf.write(data) - ntf.flush() - - file_path = ApidaeTrekParser._maybe_fix_encoding_to_utf8(ntf.name) - ds = DataSource(file_path) - geos = get_geos_linestring(ds) - geos.transform(settings.SRID) - return geos - - @staticmethod - def _get_layer(datasource, layer_name): - for layer in datasource: - if layer.name == layer_name: - return layer - - @staticmethod - def _convert_to_geos(geom): - # FIXME: is it right to try to correct input geometries? - # FIXME: how to log that info/spread errors? - if geom.geom_type == 'MultiLineString' and any([ls for ls in geom if ls.num_points == 1]): - # Handles that framework conversion fails when there are LineStrings of length 1 - geos_mls = MultiLineString([ls.geos for ls in geom if ls.num_points > 1]) - geos_mls.srid = geom.srid - return geos_mls - - return geom.geos - - @staticmethod - def _maybe_get_linestring_from_layer(layer): - if layer.num_feat == 0: - return None - geoms = [] - for feat in layer: - if feat.geom.num_coords == 0: - continue - geos = ApidaeTrekParser._convert_to_geos(feat.geom) - if geos.geom_type == 'MultiLineString': - geos = geos.merged # If possible we merge the MultiLineString into a LineString - if geos.geom_type == 'MultiLineString': - raise RowImportError(_("Feature geometry cannot be converted to a single continuous LineString feature")) - geoms.append(geos) - - full_geom = MultiLineString(geoms) - full_geom.srid = geoms[0].srid - full_geom = full_geom.merged # If possible we merge the MultiLineString into a LineString - if full_geom.geom_type == 'MultiLineString': - raise RowImportError(_("Geometries from various features cannot be converted to a single continuous LineString feature")) - - return full_geom - @staticmethod def _find_matching_practice_in_mapping(activities_ids, mapping): returned_practice_name = None diff --git a/geotrek/trekking/tests/data/apidae_trek_parser/a_trek_with_bad_geom.json b/geotrek/trekking/tests/data/apidae_trek_parser/a_trek_with_bad_geom.json new file mode 100644 index 0000000000..17c2c544c2 --- /dev/null +++ b/geotrek/trekking/tests/data/apidae_trek_parser/a_trek_with_bad_geom.json @@ -0,0 +1,277 @@ +{ + "numFound": 1, + "objetsTouristiques": [ + { + "id": 123123, + "informationsEquipement": { + "activites": [ + { + "id": 3184, + "libelleFr": "Sports pédestres", + "libelleEn": "Pedestrian sports" + }, + { + "id": 3333, + "libelleFr": "Itinéraire de randonnée pédestre", + "libelleEn": "Hiking itinerary" + } + ], + "itineraire": { + "dureeJournaliere": 150, + "referencesCartographiques": { + "libelleFr": "TOP 25 IGN 3531 OT", + "libelleEn": "Map IGN3531OT Top 25" + }, + "referencesTopoguides": { + "libelleFr": "Cartoguide en vente à l'Office de Tourisme", + "libelleEn": "Guidebook sold at the tourist board" + }, + "passagesDelicats": { + "libelleFr": "À éviter après de grosses pluies.", + "libelleEn": "Avoid after heavy rain." + }, + "itineraireType": "BOUCLE", + "itineraireBalise": "BALISE", + "precisionsBalisage": { + "libelleFr": "Suivre le balisage GR (blanc/rouge) ou GRP (jaune/rouge).", + "libelleEn": "Follow the GR (white / red) or GRP (yellow / red) markings." + }, + "naturesTerrain": [ + { + "id": 4245, + "libelleEn": "Suitable for all terrain strollers" + }, + { + "id": 4239, + "libelleEn": "Rock", + "libelleFr": "Rocher" + }, + { + "id": 4241, + "libelleEn": "Ground", + "libelleFr": "Terre" + } + ] + } + }, + "localisation": { + "adresse": { + "adresse1": "Parking sur la place du village", + "commune": { + "nom": "Sallanches" + } + }, + "geolocalisation": { + "complement": { + "libelleFr": "En voiture, rejoindre le village de Salanches.", + "libelleEn": "By car, go to the village of Sallanches.", + "libelleIt": "In auto, andare al villaggio di Sallances." + } + }, + "environnements": [ + { + "elementReferenceType": "Environnement", + "id": 135, + "libelleFr": "A la campagne", + "libelleEn": "In the country", + "libelleEs": "En el campo", + "libelleIt": "Campagna", + "libelleDe": "Auf dem Land", + "libelleNl": "Platteland" + } + ] + }, + "illustrations": [ + { + "type": "IMAGE", + "nom": { + "libelleEn": "The title of the picture" + }, + "legende": { + "libelleEn": "The legend of the picture" + }, + "copyright": { + "libelleEn": "The author of the picture" + }, + "traductionFichiers": [ + { + "url": "https://example.net/a_picture.jpg" + } + ] + }, + { + "type": "IMAGE", + "nom": { + "libelleEn": "This picture should not be imported" + }, + "legende": { + "libelleEn": "This picture should not be imported" + }, + "copyright": { + "libelleEn": "This picture should not be imported" + }, + "traductionFichiers": [ + { + "url": "https://example.net/another_picture.jpg" + } + ], + "dateLimiteDePublication": "2002-08-23T00:00:00.000+0000" + } + ], + "prestations": { + "typesClientele": [ + { + "id": 589, + "libelleFr": "Niveau rouge - Difficile", + "libelleEn": "Level red – hard", + "libelleEs": "Rojo", + "libelleIt": "Livello rosso – difficile", + "libelleDe": "Rot", + "libelleNl": "Rood" + } + ] + }, + "multimedias": [ + { + "type": "PLAN", + "traductionFichiers": [ + { + "url": "https://example.net/trace_with_multiple_not_continuous_features.gpx", + "extension": "gpx" + } + ] + } + ], + "nom": { + "libelleFr": "Une belle randonnée de test", + "libelleEn": "A great hike to test" + }, + "ouverture": { + "periodeEnClair": { + "libelleFr": "Ouvert toute l'année\n\nFermeture exceptionnelle en cas de pluie forte", + "libelleEn": "Open all year long\n\nExceptionally closed during heavy rain" + } + }, + "descriptionTarif": { + "indicationTarif": "PAYANT", + "tarifsEnClair": { + "libelleFr": "Montée en télésiège payante. 2 points de vente - télésiège Frastaz et Bois Noir.", + "libelleEn": "Ski lift ticket office: 2 shops - Frastaz and Bois Noir ski lifts." + } + }, + "presentation": { + "descriptifCourt": { + "libelleFr": "La description courte en français.", + "libelleEn": "The short description in english." + }, + "descriptifDetaille": { + "libelleFr": "La description détaillée en français.", + "libelleEn": "The longer description in english." + }, + "descriptifsThematises": [ + { + "theme": { + "id": 6527 + }, + "description": { + "libelleFr": "Départ : du parking de la Chapelle Saint Michel \r\n1/ Suivre le chemin qui part à droite, traversant le vallon.\r\n2/ Au carrefour tourner à droite et suivre la rivière\r\n3/ Retour à la chapelle en passant à travers le petit bois.", + "libelleEn": "Start: from the parking near the Chapelle Saint Michel \r\n1/ Follow the path starting at right-hand, cross the valley.\r\n2/ At the crossroad turn left and follow the river.\r\n3/ Back to the chapelle by the woods." + } + } + ], + "typologiesPromoSitra": [ + { + "elementReferenceType": "TypologiePromoSitra", + "id": 6157, + "libelleFr": "Géologie", + "libelleEn": "Geology", + "libelleEs": "Geología", + "libelleIt": "Geologia" + }, + { + "elementReferenceType": "TypologiePromoSitra", + "id": 6158, + "libelleFr": "Historique", + "libelleEn": "Historic", + "libelleEs": "Histórico", + "libelleIt": "Storico" + }, + { + "elementReferenceType": "TypologiePromoSitra", + "id": 1599, + "libelleFr": "Déconseillé par mauvais temps", + "libelleEn": "Not recommended in bad weather", + "libelleEs": "Desaconsejado en época de mal tiempo", + "libelleIt": "Sconsigliato con cattivo tempo" + }, + { + "elementReferenceType": "TypologiePromoSitra", + "id": 4971, + "libelleFr": "Inscrit au PDIPR", + "libelleEn": "Listed PDIPR", + "libelleEs": "Listado en el PDIPR", + "libelleIt": "Elencato nel PDIPR" + } + ] + }, + "gestion": { + "membreProprietaire": { + "nom": "Office de tourisme de Sallanches", + "siteWeb": "https://www.example.net/ot-sallanches" + } + }, + "informations": { + "moyensCommunication": [ + { + "type": { + "id": 201, + "libelleFr": "Téléphone", + "libelleEn": "Telephone", + "libelleEs": "Teléfono", + "libelleIt": "Telefono" + }, + "coordonnees": { + "fr": "01 23 45 67 89" + } + }, + { + "type": { + "id": 12345, + "libelleFr": "Signaux de fumée", + "libelleEn": "Smoke signals" + }, + "coordonnees": { + "fr": "1 gros nuage suivi de 2 petits" + } + }, + { + "type": { + "id": 204, + "libelleFr": "Mél", + "libelleEn": "e-mail", + "libelleEs": "Correo electrónico", + "libelleIt": "Posta elettronica", + "libelleDe": "E-Mail-Adresse", + "libelleNl": "Mail" + }, + "coordonnees": { + "fr": "accueil-rando@example.com" + } + }, + { + "type": { + "id": 205, + "libelleFr": "Site web (URL)", + "libelleEn": "Website", + "libelleEs": "Sitio web (URL)", + "libelleIt": "Sito Web (URL)" + }, + "coordonnees": { + "fr": "https://example.com/ma_rando.html" + } + } + ] + } + } + ] +} diff --git a/geotrek/trekking/tests/test_parsers.py b/geotrek/trekking/tests/test_parsers.py index be5e62b70b..908b7270f7 100644 --- a/geotrek/trekking/tests/test_parsers.py +++ b/geotrek/trekking/tests/test_parsers.py @@ -7,7 +7,6 @@ from unittest import skipIf from unittest.mock import Mock from urllib.parse import urlparse -from shutil import copy as copyfile from django.conf import settings from django.contrib.gis.geos import Point, LineString, MultiLineString, WKTWriter @@ -17,7 +16,6 @@ from django.test.utils import override_settings from geotrek.common.utils import testdata -from geotrek.common.utils.file_infos import get_encoding_file from geotrek.common.models import Theme, FileType, Attachment, Label, RecordSource, License from geotrek.common.tests.mixins import GeotrekParserTestMixin from geotrek.common.parsers import DownloadImportError @@ -1145,6 +1143,17 @@ def test_trek_not_imported_when_no_plan_attached(self, mocked_get): self.assertEqual(Trek.objects.count(), 0) self.assertIn('no attachment with the type "PLAN"', output_stdout.getvalue()) + @mock.patch('requests.get') + def test_trek_not_imported_when_bad_geometry(self, mocked_get): + output_stdout = StringIO() + mocked_get.side_effect = self.make_dummy_get('a_trek_with_bad_geom.json') + + call_command('import', 'geotrek.trekking.tests.test_parsers.TestApidaeTrekParser', verbosity=2, + stdout=output_stdout) + + self.assertEqual(Trek.objects.count(), 0) + self.assertIn('Geometries from various features cannot be converted to a single continuous LineString feature,', output_stdout.getvalue()) + @mock.patch('requests.get') def test_trek_not_imported_when_no_plan(self, mocked_get): output_stdout = StringIO() @@ -1252,38 +1261,6 @@ def test_trek_illustration_is_not_imported_on_missing_file_metadata(self, mocked self.assertEqual(Attachment.objects.count(), 0) -class TestApidaeTrekParserConvertEncodingFiles(TestCase): - data_dir = "geotrek/trekking/tests/data" - - def setUp(self): - if not os.path.exists(settings.TMP_DIR): - os.mkdir(settings.TMP_DIR) - - def test_fix_encoding_to_utf8(self): - file_name = f'{settings.TMP_DIR}/file_bad_encoding_tmp.kml' - copyfile(f'{self.data_dir}/file_bad_encoding.kml', file_name) - - encoding = get_encoding_file(file_name) - self.assertNotEqual(encoding, "utf-8") - - new_file_name = ApidaeTrekParser._maybe_fix_encoding_to_utf8(file_name) - - encoding = get_encoding_file(new_file_name) - self.assertEqual(encoding, "utf-8") - - def test_not_fix_encoding_to_utf8(self): - file_name = f'{settings.TMP_DIR}/file_good_encoding_tmp.kml' - copyfile(f'{self.data_dir}/file_good_encoding.kml', file_name) - - encoding = get_encoding_file(file_name) - self.assertEqual(encoding, "utf-8") - - new_file_name = ApidaeTrekParser._maybe_fix_encoding_to_utf8(file_name) - - encoding = get_encoding_file(new_file_name) - self.assertEqual(encoding, "utf-8") - - class TestApidaeTrekThemeParser(ApidaeTrekThemeParser): url = 'https://example.net/fake/api/' @@ -1487,115 +1464,6 @@ def test_it_returns_default_text_when_no_details(self): self.assertDictEqual(description, ApidaeTrekParser.default_trek_marking_description) -class GpxToGeomTests(SimpleTestCase): - - @staticmethod - def _get_gpx_from(filename): - with open(filename, 'r') as f: - gpx = f.read() - return bytes(gpx, 'utf-8') - - def test_gpx_with_waypoint_can_be_converted(self): - gpx = self._get_gpx_from('geotrek/trekking/tests/data/apidae_trek_parser/apidae_test_trek.gpx') - - geom = ApidaeTrekParser._get_geom_from_gpx(gpx) - - self.assertEqual(geom.srid, 2154) - self.assertEqual(geom.geom_type, 'LineString') - self.assertEqual(len(geom.coords), 13) - first_point = geom.coords[0] - self.assertAlmostEqual(first_point[0], 977776.9, delta=0.1) - self.assertAlmostEqual(first_point[1], 6547354.8, delta=0.1) - - def test_gpx_with_route_points_can_be_converted(self): - gpx = self._get_gpx_from('geotrek/trekking/tests/data/apidae_trek_parser/trace_with_route_points.gpx') - - geom = ApidaeTrekParser._get_geom_from_gpx(gpx) - - self.assertEqual(geom.srid, 2154) - self.assertEqual(geom.geom_type, 'LineString') - self.assertEqual(len(geom.coords), 13) - first_point = geom.coords[0] - self.assertAlmostEqual(first_point[0], 977776.9, delta=0.1) - self.assertAlmostEqual(first_point[1], 6547354.8, delta=0.1) - - def test_it_raises_an_error_on_not_continuous_segments(self): - gpx = self._get_gpx_from('geotrek/trekking/tests/data/apidae_trek_parser/trace_with_not_continuous_segments.gpx') - - with self.assertRaises(RowImportError): - ApidaeTrekParser._get_geom_from_gpx(gpx) - - def test_it_handles_segment_with_single_point(self): - gpx = self._get_gpx_from( - 'geotrek/trekking/tests/data/apidae_trek_parser/trace_with_single_point_segment.gpx' - ) - geom = ApidaeTrekParser._get_geom_from_gpx(gpx) - - self.assertEqual(geom.srid, 2154) - self.assertEqual(geom.geom_type, 'LineString') - self.assertEqual(len(geom.coords), 13) - - def test_it_raises_an_error_when_no_linestring(self): - gpx = self._get_gpx_from('geotrek/trekking/tests/data/apidae_trek_parser/trace_with_no_feature.gpx') - - with self.assertRaises(RowImportError): - ApidaeTrekParser._get_geom_from_gpx(gpx) - - def test_it_handles_multiple_continuous_features(self): - gpx = self._get_gpx_from('geotrek/trekking/tests/data/apidae_trek_parser/trace_with_multiple_continuous_features.gpx') - geom = ApidaeTrekParser._get_geom_from_gpx(gpx) - - self.assertEqual(geom.srid, 2154) - self.assertEqual(geom.geom_type, 'LineString') - self.assertEqual(len(geom.coords), 12) - first_point = geom.coords[0] - self.assertAlmostEqual(first_point[0], 977776.9, delta=0.1) - self.assertAlmostEqual(first_point[1], 6547354.8, delta=0.1) - - def test_it_handles_multiple_continuous_features_with_one_empty(self): - gpx = self._get_gpx_from('geotrek/trekking/tests/data/apidae_trek_parser/trace_with_multiple_continuous_features_and_one_empty.gpx') - geom = ApidaeTrekParser._get_geom_from_gpx(gpx) - - self.assertEqual(geom.srid, 2154) - self.assertEqual(geom.geom_type, 'LineString') - self.assertEqual(len(geom.coords), 12) - first_point = geom.coords[0] - self.assertAlmostEqual(first_point[0], 977776.9, delta=0.1) - self.assertAlmostEqual(first_point[1], 6547354.8, delta=0.1) - - def test_it_raises_error_on_multiple_not_continuous_features(self): - gpx = self._get_gpx_from('geotrek/trekking/tests/data/apidae_trek_parser/trace_with_multiple_not_continuous_features.gpx') - with self.assertRaises(RowImportError): - ApidaeTrekParser._get_geom_from_gpx(gpx) - - -class KmlToGeomTests(SimpleTestCase): - - @staticmethod - def _get_kml_from(filename): - with open(filename, 'r') as f: - kml = f.read() - return bytes(kml, 'utf-8') - - def test_kml_can_be_converted(self): - kml = self._get_kml_from('geotrek/trekking/tests/data/apidae_trek_parser/trace.kml') - - geom = ApidaeTrekParser._get_geom_from_kml(kml) - - self.assertEqual(geom.srid, 2154) - self.assertEqual(geom.geom_type, 'LineString') - self.assertEqual(len(geom.coords), 61) - first_point = geom.coords[0] - self.assertAlmostEqual(first_point[0], 973160.8, delta=0.1) - self.assertAlmostEqual(first_point[1], 6529320.1, delta=0.1) - - def test_it_raises_exception_when_no_linear_data(self): - kml = self._get_kml_from('geotrek/trekking/tests/data/apidae_trek_parser/trace_with_no_line.kml') - - with self.assertRaises(RowImportError): - ApidaeTrekParser._get_geom_from_kml(kml) - - class GetPracticeNameFromActivities(SimpleTestCase): def test_it_considers_specific_activity_before_default_activity(self):