diff --git a/django_project/core/settings/project.py b/django_project/core/settings/project.py index e44245d..c4e99cf 100644 --- a/django_project/core/settings/project.py +++ b/django_project/core/settings/project.py @@ -10,7 +10,7 @@ from django.db import connection from boto3.s3.transfer import TransferConfig from .contrib import * # noqa -from .utils import code_release_version +from .utils import code_release_version, code_commit_release_version ALLOWED_HOSTS = ['*'] ADMINS = ( @@ -44,6 +44,7 @@ ) CODE_RELEASE_VERSION = code_release_version() +CODE_COMMIT_HASH = code_commit_release_version() # s3 # TODO: set CacheControl in object_parameters+endpoint_url diff --git a/django_project/core/settings/utils.py b/django_project/core/settings/utils.py index 10d37c9..f9f2922 100644 --- a/django_project/core/settings/utils.py +++ b/django_project/core/settings/utils.py @@ -39,6 +39,16 @@ def code_release_version(): return '0.0.1' +def code_commit_release_version(): + """ Read code commit release version from file.""" + version = absolute_path('version', 'commit.txt') + if os.path.exists(version): + commit = (open(version, 'rb').read()).decode("utf-8") + if commit: + return commit + return 'no-info' + + class UUIDEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, uuid.UUID): diff --git a/django_project/cplus/definitions/constants.py b/django_project/cplus/definitions/constants.py index 98066e5..789e3a3 100644 --- a/django_project/cplus/definitions/constants.py +++ b/django_project/cplus/definitions/constants.py @@ -7,6 +7,7 @@ NCS_PATHWAY_SEGMENT = "ncs_pathways" NCS_CARBON_SEGMENT = "ncs_carbon" PRIORITY_LAYERS_SEGMENT = "priority_layers" +NPV_PRIORITY_LAYERS_SEGMENT = "npv" # Naming for outputs sub-folder relative to base directory OUTPUTS_SEGMENT = "outputs" @@ -31,10 +32,31 @@ STYLE_ATTRIBUTE = "style" USER_DEFINED_ATTRIBUTE = "user_defined" UUID_ATTRIBUTE = "uuid" +YEARS_ATTRIBUTE = "years" +DISCOUNT_ATTRIBUTE = "discount" +ABSOLUTE_NPV_ATTRIBUTE = "absolute_npv" +NORMALIZED_NPV_ATTRIBUTE = "normalized_npv" +YEARLY_RATES_ATTRIBUTE = "yearly_rates" +ENABLED_ATTRIBUTE = "enabled" +MIN_VALUE_ATTRIBUTE = "minimum_value" +MAX_VALUE_ATTRIBUTE = "maximum_value" +COMPUTED_ATTRIBUTE = "use_computed" +NPV_MAPPINGS_ATTRIBUTE = "mappings" +REMOVE_EXISTING_ATTRIBUTE = "remove_existing" ACTIVITY_IDENTIFIER_PROPERTY = "activity_identifier" +NPV_COLLECTION_PROPERTY = "npv_collection" # Option / settings keys CPLUS_OPTIONS_KEY = "cplus_main" LOG_OPTIONS_KEY = "cplus_log" REPORTS_OPTIONS_KEY = "cplus_report" + +# Headers for financial NPV computation +YEAR_HEADER = "Year" +TOTAL_PROJECTED_COSTS_HEADER = "Projected Total Costs/ha (US$)" +TOTAL_PROJECTED_REVENUES_HEADER = "Projected Total Revenues/ha (US$)" +DISCOUNTED_VALUE_HEADER = "Discounted Value (US$)" +MAX_YEARS = 99 + +NO_DATA_VALUE = -9999 \ No newline at end of file diff --git a/django_project/cplus/models/base.py b/django_project/cplus/models/base.py index c7758b4..0522b62 100644 --- a/django_project/cplus/models/base.py +++ b/django_project/cplus/models/base.py @@ -205,6 +205,12 @@ def __eq__(self, other) -> bool: "LayerModelComponentType", bound=LayerModelComponent ) +class PriorityLayerType(IntEnum): + """Type of priority weighting layer.""" + + DEFAULT = 0 + NPV = 1 + @dataclasses.dataclass class PriorityLayer(BaseModelComponent): @@ -213,6 +219,7 @@ class PriorityLayer(BaseModelComponent): groups: list selected: bool = False path: str = "" + type: PriorityLayerType = PriorityLayerType.DEFAULT @dataclasses.dataclass diff --git a/django_project/cplus/models/financial.py b/django_project/cplus/models/financial.py new file mode 100644 index 0000000..ad1ddf1 --- /dev/null +++ b/django_project/cplus/models/financial.py @@ -0,0 +1,167 @@ +# -*- coding: utf-8 -*- + +""" Data models for the financial elements of the tool.""" + +import dataclasses +from enum import IntEnum +import typing + +from .base import Activity + + +@dataclasses.dataclass +class NpvParameters: + """Parameters for computing an activity's NPV.""" + + years: int + discount: float + absolute_npv: float = 0.0 + normalized_npv: float = 0.0 + # Each tuple contains 3 elements i.e. revenue, costs and discount rates + yearly_rates: typing.List[tuple] = dataclasses.field(default_factory=list) + + +@dataclasses.dataclass +class ActivityNpv: + """Mapping of the NPV parameters to the corresponding Activity model.""" + + params: NpvParameters + enabled: bool + activity: typing.Optional[Activity] + + @property + def activity_id(self) -> str: + """Gets the identifier of the activity model. + + :returns: The unique identifier of the activity model else an + empty string if no activity has been set. + """ + if not self.activity: + return "" + + return str(self.activity.uuid) + + @property + def base_name(self) -> str: + """Returns a proposed name for the activity NPV. + + An empty string will be return id the `activity` attribute + is not set. + + :returns: Proposed base name for the activity NPV. + :rtype: str + """ + if self.activity is None: + return "" + + return f"{self.activity.name} NPV Norm" + + +@dataclasses.dataclass +class ActivityNpvCollection: + """Collection for all ActivityNpvMapping configurations that have been + specified by the user. + """ + + minimum_value: float + maximum_value: float + use_computed: bool = True + remove_existing: bool = False + mappings: typing.List[ActivityNpv] = dataclasses.field(default_factory=list) + + def activity_npv(self, activity_identifier: str) -> typing.Optional[ActivityNpv]: + """Gets the mapping of an activity's NPV mapping if defined. + + :param activity_identifier: Unique identifier of an activity whose + NPV mapping is to be retrieved. + :type activity_identifier: str + + :returns: The activity's NPV mapping else None if not found. + :rtype: ActivityNpv + """ + matching_mapping = [ + activity_npv + for activity_npv in self.mappings + if activity_npv.activity_id == activity_identifier + ] + + return None if len(matching_mapping) == 0 else matching_mapping[0] + + def update_computed_normalization_range(self) -> bool: + """Update the minimum and maximum normalization values + based on the absolute values of the existing ActivityNpv + objects. + + Values for disabled activity NPVs will be excluded from + the computation. + + :returns: True if the min/max values were updated else False if + there are no mappings or valid absolute NPV values defined. + """ + if len(self.mappings) == 0: + return False + + valid_npv_values = [ + activity_npv.params.absolute_npv + for activity_npv in self.mappings + if activity_npv.params.absolute_npv is not None and activity_npv.enabled + ] + + if len(valid_npv_values) == 0: + return False + + self.minimum_value = min(valid_npv_values) + self.maximum_value = max(valid_npv_values) + + return True + + def normalize_npvs(self) -> bool: + """Normalize the NPV values of the activities using the specified + normalization range. + + If the absolute NPV values are less than or greater than the + normalization range, then they will be truncated to 0.0 and 1.0 + respectively. To avoid such a situation from occurring, it is recommended + to make sure that the ranges are synchronized using the latest absolute + NPV values hence call `update_computed_normalization_range` before + normalizing the NPVs. + + :returns: True if the NPVs were successfully normalized else False due + to various reasons such as if the minimum value is greater than the + maximum value or if the min/max values are the same. + """ + if self.minimum_value > self.maximum_value: + return False + + norm_range = float(self.maximum_value - self.minimum_value) + + if norm_range == 0.0: + return False + + for activity_npv in self.mappings: + absolute_npv = activity_npv.params.absolute_npv + if not absolute_npv: + continue + + if absolute_npv <= self.minimum_value: + normalized_npv = 0.0 + elif absolute_npv >= self.maximum_value: + normalized_npv = 1.0 + else: + normalized_npv = (absolute_npv - self.minimum_value) / norm_range + + activity_npv.params.normalized_npv = normalized_npv + + return True + + +@dataclasses.dataclass +class ActivityNpvPwl: + """Convenience class that contains parameters for creating + a PWL raster layer. + """ + + npv: ActivityNpv + extent: typing.List[float] + crs: str + pixel_size: float diff --git a/django_project/cplus/models/helpers.py b/django_project/cplus/models/helpers.py index 169a786..7c2c131 100644 --- a/django_project/cplus/models/helpers.py +++ b/django_project/cplus/models/helpers.py @@ -17,18 +17,31 @@ SpatialExtent, ) from ..definitions.constants import ( + ACTIVITY_IDENTIFIER_PROPERTY, + ABSOLUTE_NPV_ATTRIBUTE, CARBON_PATHS_ATTRIBUTE, + COMPUTED_ATTRIBUTE, + DISCOUNT_ATTRIBUTE, + ENABLED_ATTRIBUTE, STYLE_ATTRIBUTE, NAME_ATTRIBUTE, DESCRIPTION_ATTRIBUTE, LAYER_TYPE_ATTRIBUTE, + NPV_MAPPINGS_ATTRIBUTE, + MAX_VALUE_ATTRIBUTE, + MIN_VALUE_ATTRIBUTE, + NORMALIZED_NPV_ATTRIBUTE, PATH_ATTRIBUTE, PIXEL_VALUE_ATTRIBUTE, PRIORITY_LAYERS_SEGMENT, + REMOVE_EXISTING_ATTRIBUTE, USER_DEFINED_ATTRIBUTE, UUID_ATTRIBUTE, + YEARS_ATTRIBUTE, + YEARLY_RATES_ATTRIBUTE, ) from ..definitions.defaults import DEFAULT_CRS_ID +from .financial import ActivityNpv, ActivityNpvCollection, NpvParameters from ..utils.helper import log @@ -425,3 +438,157 @@ def extent_to_project_crs_extent( log(f"{e}, using the default input extent.") return input_rect + + +def activity_npv_to_dict(activity_npv: ActivityNpv) -> dict: + """Converts an ActivityNpv object to a dictionary representation. + + :returns: A dictionary containing attribute name-value pairs. + :rtype: dict + """ + return { + YEARS_ATTRIBUTE: activity_npv.params.years, + DISCOUNT_ATTRIBUTE: activity_npv.params.discount, + ABSOLUTE_NPV_ATTRIBUTE: activity_npv.params.absolute_npv, + NORMALIZED_NPV_ATTRIBUTE: activity_npv.params.normalized_npv, + YEARLY_RATES_ATTRIBUTE: activity_npv.params.yearly_rates, + ENABLED_ATTRIBUTE: activity_npv.enabled, + ACTIVITY_IDENTIFIER_PROPERTY: activity_npv.activity_id, + } + + +def create_activity_npv(activity_npv_dict: dict) -> typing.Optional[ActivityNpv]: + """Creates an ActivityNpv object from the equivalent dictionary + representation. + + Please note that the `activity` attribute of the `ActivityNpv` object will be + `None` hence, will have to be set manually by extracting the corresponding `Activity` + from the activity UUID. + + :param activity_npv_dict: Dictionary containing information for deserializing + to the ActivityNpv object. + :type activity_npv_dict: dict + + :returns: ActivityNpv deserialized from the dictionary representation. + :rtype: ActivityNpv + """ + args = [] + if YEARS_ATTRIBUTE in activity_npv_dict: + args.append(activity_npv_dict[YEARS_ATTRIBUTE]) + + if DISCOUNT_ATTRIBUTE in activity_npv_dict: + args.append(activity_npv_dict[DISCOUNT_ATTRIBUTE]) + + if ABSOLUTE_NPV_ATTRIBUTE in activity_npv_dict: + args.append(activity_npv_dict[ABSOLUTE_NPV_ATTRIBUTE]) + + if NORMALIZED_NPV_ATTRIBUTE in activity_npv_dict: + args.append(activity_npv_dict[NORMALIZED_NPV_ATTRIBUTE]) + + if len(args) < 4: + return None + + yearly_rates = [] + if YEARLY_RATES_ATTRIBUTE in activity_npv_dict: + yearly_rates = activity_npv_dict[YEARLY_RATES_ATTRIBUTE] + + npv_params = NpvParameters(*args) + npv_params.yearly_rates = yearly_rates + + npv_enabled = False + if ENABLED_ATTRIBUTE in activity_npv_dict: + npv_enabled = activity_npv_dict[ENABLED_ATTRIBUTE] + + return ActivityNpv(npv_params, npv_enabled, None) + + +def activity_npv_collection_to_dict(activity_collection: ActivityNpvCollection) -> dict: + """Converts the activity NPV collection object to the + dictionary representation. + + :returns: A dictionary containing the attribute name-value pairs + of an activity NPV collection object + :rtype: dict + """ + npv_collection_dict = { + MIN_VALUE_ATTRIBUTE: activity_collection.minimum_value, + MAX_VALUE_ATTRIBUTE: activity_collection.maximum_value, + COMPUTED_ATTRIBUTE: activity_collection.use_computed, + REMOVE_EXISTING_ATTRIBUTE: activity_collection.remove_existing, + } + + mapping_dict = list(map(activity_npv_to_dict, activity_collection.mappings)) + npv_collection_dict[NPV_MAPPINGS_ATTRIBUTE] = mapping_dict + + return npv_collection_dict + + +def create_activity_npv_collection( + activity_collection_dict: dict, reference_activities: typing.List[Activity] = None +) -> typing.Optional[ActivityNpvCollection]: + """Creates an activity NPV collection object from the corresponding + dictionary representation. + + :param activity_collection_dict: Dictionary representation containing + information of an activity NPV collection object. + :type activity_collection_dict: dict + + :param reference_activities: Optional list of activities that will be + used to lookup when deserializing the ActivityNpv objects. + :type reference_activities: list + + :returns: Activity NPV collection object from the dictionary representation + or None if the source dictionary is invalid. + :rtype: ActivityNpvCollection + """ + if len(activity_collection_dict) == 0: + return None + + ref_activities_by_uuid = { + str(activity.uuid): activity for activity in reference_activities + } + + args = [] + + # Minimum value + if MIN_VALUE_ATTRIBUTE in activity_collection_dict: + args.append(activity_collection_dict[MIN_VALUE_ATTRIBUTE]) + + # Maximum value + if MAX_VALUE_ATTRIBUTE in activity_collection_dict: + args.append(activity_collection_dict[MAX_VALUE_ATTRIBUTE]) + + if len(args) < 2: + return None + + activity_npv_collection = ActivityNpvCollection(*args) + + # Use computed + if COMPUTED_ATTRIBUTE in activity_collection_dict: + use_computed = activity_collection_dict[COMPUTED_ATTRIBUTE] + activity_npv_collection.use_computed = use_computed + + # Remove existing + if REMOVE_EXISTING_ATTRIBUTE in activity_collection_dict: + remove_existing = activity_collection_dict[REMOVE_EXISTING_ATTRIBUTE] + activity_npv_collection.remove_existing = remove_existing + + if NPV_MAPPINGS_ATTRIBUTE in activity_collection_dict: + mappings_dict = activity_collection_dict[NPV_MAPPINGS_ATTRIBUTE] + npv_mappings = [] + for md in mappings_dict: + activity_npv = create_activity_npv(md) + if activity_npv is None: + continue + + # Get the corresponding activity from the unique identifier + if ACTIVITY_IDENTIFIER_PROPERTY in md: + activity_id = md[ACTIVITY_IDENTIFIER_PROPERTY] + if activity_id in ref_activities_by_uuid: + ref_activity = ref_activities_by_uuid[activity_id] + activity_npv.activity = ref_activity + npv_mappings.append(activity_npv) + + activity_npv_collection.mappings = npv_mappings + + return activity_npv_collection diff --git a/django_project/cplus/utils/conf.py b/django_project/cplus/utils/conf.py index d18781c..9e00be5 100644 --- a/django_project/cplus/utils/conf.py +++ b/django_project/cplus/utils/conf.py @@ -5,6 +5,7 @@ import contextlib import dataclasses +import datetime import enum import json import os.path @@ -21,6 +22,7 @@ STYLE_ATTRIBUTE, NCS_CARBON_SEGMENT, NCS_PATHWAY_SEGMENT, + NPV_COLLECTION_PROPERTY, PATH_ATTRIBUTE, PATHWAYS_ATTRIBUTE, PIXEL_VALUE_ATTRIBUTE, @@ -32,16 +34,20 @@ Activity, NcsPathway, Scenario, + ScenarioResult, SpatialExtent, ) +from cplus.models.financial import ActivityNpvCollection from cplus.models.helpers import ( + activity_npv_collection_to_dict, create_activity, + create_activity_npv_collection, create_ncs_pathway, layer_component_to_dict, ncs_pathway_to_dict, ) -from cplus.utils.helper import log +from cplus.utils.helper import log, todict, CustomJsonEncoder @contextlib.contextmanager @@ -85,14 +91,30 @@ def from_qgs_settings(cls, identifier: str, settings: QgsSettings): :rtype: ScenarioSettings """ + activities_list = settings.value("activities", []) + weighted_activities_list = settings.value("activities", []) + + activities = [ + settings_manager.get_activity(activity_uuid) + for activity_uuid in activities_list + ] + weighted_activities = [ + settings_manager.get_activity(activity_uuid) + for activity_uuid in weighted_activities_list + ] + return cls( uuid=uuid.UUID(identifier), name=settings.value("name", None), description=settings.value("description", None), + extent=[], + activities=activities, + weighted_activities=weighted_activities, + priority_layer_groups=[], ) @classmethod - def get_scenario_extent(cls): + def get_scenario_extent(cls, identifier): """Fetches Scenario extent from the passed scenario settings. @@ -100,9 +122,11 @@ def get_scenario_extent(cls): :returns: Spatial extent instance extent :rtype: SpatialExtent """ - spatial_key = "extent/spatial" + spatial_key = ( + f"{settings_manager._get_scenario_settings_base(identifier)}/extent/spatial" + ) - with qgis_settings(spatial_key, cls) as settings: + with qgis_settings(spatial_key) as settings: bbox = settings.value("bbox", None) spatial_extent = SpatialExtent(bbox=bbox) @@ -135,6 +159,7 @@ class Settings(enum.Enum): # Last selected data directory LAST_DATA_DIR = "last_data_dir" + LAST_MASK_DIR = "last_mask_dir" # Advanced settings BASE_DIR = "advanced/base_dir" @@ -173,15 +198,25 @@ class Settings(enum.Enum): LANDUSE_WEIGHTED = "landuse_weighted" HIGHEST_POSITION = "highest_position" + # Processing option + PROCESSING_TYPE = "processing_type" + + # DEBUG + DEBUG = "debug" + DEV_MODE = "dev_mode" + BASE_API_URL = "base_api_url" + class SettingsManager(QtCore.QObject): """Manages saving/loading settings for the plugin in QgsSettings.""" BASE_GROUP_NAME: str = "cplus_plugin" SCENARIO_GROUP_NAME: str = "scenarios" + SCENARIO_RESULTS_GROUP_NAME: str = "scenarios_results" PRIORITY_GROUP_NAME: str = "priority_groups" PRIORITY_LAYERS_GROUP_NAME: str = "priority_layers" NCS_PATHWAY_BASE: str = "ncs_pathways" + LAYER_MAPPING_BASE: str = "layer_mapping" ACTIVITY_BASE: str = "activities" @@ -272,6 +307,21 @@ def _get_scenario_settings_base(self, identifier): f"{str(identifier)}" ) + def _get_scenario_results_settings_base(self, identifier): + """Gets the scenario results settings base url. + + :param identifier: Scenario identifier + :type identifier: uuid.UUID + + :returns: Scenario settings base group + :rtype: str + """ + return ( + f"{self.BASE_GROUP_NAME}/" + f"{self.SCENARIO_RESULTS_GROUP_NAME}/" + f"{str(identifier)}" + ) + def save_scenario(self, scenario_settings): """Save the passed scenario settings into the plugin settings @@ -282,10 +332,19 @@ def save_scenario(self, scenario_settings): self.save_scenario_extent(settings_key, scenario_settings.extent) + scenario_activities_ids = [ + str(activity.uuid) for activity in scenario_settings.activities + ] + weighted_activities_ids = [ + str(activity.uuid) for activity in scenario_settings.weighted_activities + ] + with qgis_settings(settings_key) as settings: + settings.setValue("uuid", scenario_settings.uuid) settings.setValue("name", scenario_settings.name) settings.setValue("description", scenario_settings.description) - settings.setValue("uuid", scenario_settings.uuid) + settings.setValue("activities", scenario_activities_ids) + settings.setValue("weighted_activities", weighted_activities_ids) def save_scenario_extent(self, key, extent): """Saves the scenario extent into plugin settings @@ -301,28 +360,28 @@ def save_scenario_extent(self, key, extent): extent (SpatialExtent): Scenario extent key (str): QgsSettings group key """ - spatial_extent = extent.spatial.bbox + spatial_extent = extent.bbox spatial_key = f"{key}/extent/spatial/" with qgis_settings(spatial_key) as settings: settings.setValue("bbox", spatial_extent) - def get_scenario(self, identifier): - """Retrieves the scenario that matches the passed identifier. - - :param identifier: Scenario identifier - :type identifier: str - - :returns: Scenario settings instance - :rtype: ScenarioSettings - """ - - settings_key = self._get_scenario_settings_base(identifier) - with qgis_settings(settings_key) as settings: - scenario_settings = ScenarioSettings.from_qgs_settings( - str(identifier), settings - ) - return scenario_settings + # def get_scenario(self, identifier): + # """Retrieves the scenario that matches the passed identifier. + # + # :param identifier: Scenario identifier + # :type identifier: str + # + # :returns: Scenario settings instance + # :rtype: ScenarioSettings + # """ + # + # settings_key = self._get_scenario_settings_base(identifier) + # with qgis_settings(settings_key) as settings: + # scenario_settings = ScenarioSettings.from_qgs_settings( + # str(identifier), settings + # ) + # return scenario_settings def get_scenario(self, scenario_id): """Retrieves the first scenario that matched the passed scenario id. @@ -334,17 +393,18 @@ def get_scenario(self, scenario_id): :rtype: ScenarioSettings """ - result = [] with qgis_settings( f"{self.BASE_GROUP_NAME}/" f"{self.SCENARIO_GROUP_NAME}" ) as settings: - for uuid in settings.childGroups(): - scenario_settings_key = self._get_scenario_settings_base(uuid) + for scenario_uuid in settings.childGroups(): + scenario_settings_key = self._get_scenario_settings_base(scenario_uuid) with qgis_settings(scenario_settings_key) as scenario_settings: - scenario = ScenarioSettings.from_qgs_settings( - uuid, scenario_settings - ) - if scenario.id == scenario_id: + if scenario_uuid == scenario_id: + scenario = ScenarioSettings.from_qgs_settings( + scenario_uuid, scenario_settings + ) + + scenario.extent = scenario.get_scenario_extent(scenario_uuid) return scenario return None @@ -358,18 +418,30 @@ def get_scenarios(self): with qgis_settings( f"{self.BASE_GROUP_NAME}/" f"{self.SCENARIO_GROUP_NAME}" ) as settings: - for uuid in settings.childGroups(): - scenario_settings_key = self._get_scenario_settings_base(uuid) + for scenario_uuid in settings.childGroups(): + scenario_settings_key = self._get_scenario_settings_base(scenario_uuid) with qgis_settings(scenario_settings_key) as scenario_settings: scenario = ScenarioSettings.from_qgs_settings( - uuid, scenario_settings - ) - scenario.extent = self.get_scenario_ - result.append( - ScenarioSettings.from_qgs_settings(uuid, scenario_settings) + scenario_uuid, scenario_settings ) + scenario.extent = scenario.get_scenario_extent(scenario_uuid) + result.append(scenario) return result + def delete_scenario(self, scenario_id): + """Delete the scenario with the passed scenarion id. + + :param scenario_id: Scenario identifier + :type scenario_id: str + """ + + with qgis_settings( + f"{self.BASE_GROUP_NAME}/" f"{self.SCENARIO_GROUP_NAME}" + ) as settings: + for scenario_identifier in settings.childGroups(): + if str(scenario_identifier) == str(scenario_id): + settings.remove(scenario_identifier) + def delete_all_scenarios(self): """Deletes all the plugin scenarios settings.""" with qgis_settings( @@ -378,6 +450,122 @@ def delete_all_scenarios(self): for scenario_name in settings.childGroups(): settings.remove(scenario_name) + def save_scenario_result(self, scenario_result, scenario_id): + """Save the scenario results plugin settings + + :param scenario_settings: Scenario settings + :type scenario_settings: ScenarioSettings + """ + settings_key = self._get_scenario_results_settings_base(scenario_id) + + analysis_output = json.dumps(scenario_result.analysis_output) + + with qgis_settings(settings_key) as settings: + settings.setValue("scenario_id", scenario_id) + settings.setValue( + "created_date", + scenario_result.created_date.strftime("%Y_%m_%d_%H_%M_%S"), + ) + settings.setValue("analysis_output", analysis_output) + settings.setValue("output_layer_name", scenario_result.output_layer_name) + settings.setValue("scenario_directory", scenario_result.scenario_directory) + + def get_scenario_result(self, scenario_id): + """Retrieves the scenario result that matched the passed scenario id. + + :param scenario_id: Scenario id + :type scenario_id: str + + :returns: Scenario result + :rtype: ScenarioSettings + """ + + scenario_settings_key = self._get_scenario_results_settings_base(scenario_id) + with qgis_settings(scenario_settings_key) as scenario_settings: + created_date = scenario_settings.value("created_date") + analysis_output = scenario_settings.value("analysis_output") + output_layer_name = scenario_settings.value("output_layer_name") + scenario_directory = scenario_settings.value("scenario_directory") + + try: + created_date = datetime.datetime.strptime( + created_date, "%Y_%m_%d_%H_%M_%S" + ) + analysis_output = json.loads(analysis_output) + except Exception as e: + log(f"Problem fetching scenario result, {e}") + return None + + return ScenarioResult( + scenario=None, + created_date=created_date, + analysis_output=analysis_output, + output_layer_name=output_layer_name, + scenario_directory=scenario_directory, + ) + return None + + def get_scenarios_results(self): + """Gets all the saved scenarios results. + + :returns: List of the scenario results + :rtype: list + """ + result = [] + with qgis_settings( + f"{self.BASE_GROUP_NAME}/{self.SCENARIO_RESULTS_GROUP_NAME}" + ) as settings: + for uuid in settings.childGroups(): + scenario_settings_key = self._get_scenario_results_settings_base(uuid) + with qgis_settings(scenario_settings_key) as scenario_settings: + created_date = scenario_settings.value("created_date") + analysis_output = scenario_settings.value("analysis_output") + output_layer_name = scenario_settings.value("output_layer_name") + scenario_directory = scenario_settings.value("scenario_directory") + + try: + created_date = datetime.datetime.strptime( + created_date, "%Y_%m_%d_%H_%M_%S" + ) + analysis_output = json.loads(analysis_output) + except Exception as e: + log(f"Problem fetching scenario result, {e}") + return None + + result.append( + ScenarioResult( + scenario=None, + created_date=created_date, + analysis_output=analysis_output, + output_layer_name=output_layer_name, + scenario_directory=scenario_directory, + ) + ) + return result + + def delete_scenario_result(self, scenario_id): + """Delete the scenario result that contains the scenario id. + + :param scenario_id: Scenario identifier + :type scenario_id: str + """ + + with qgis_settings( + f"{self.BASE_GROUP_NAME}/" f"{self.SCENARIO_RESULTS_GROUP_NAME}" + ) as settings: + for scenario_identifier in settings.childGroups(): + if str(scenario_identifier) == str(scenario_id): + settings.remove(scenario_identifier) + + def delete_all_scenarios_results(self): + """Deletes all the plugin scenarios results settings.""" + with qgis_settings( + f"{self.BASE_GROUP_NAME}/{self.SCENARIO_GROUP_NAME}/" + f"{self.SCENARIO_RESULTS_GROUP_NAME}" + ) as settings: + for scenario_result in settings.childGroups(): + settings.remove(scenario_result) + def _get_priority_layers_settings_base(self, identifier) -> str: """Gets the priority layers settings base url. @@ -430,6 +618,7 @@ def get_priority_layer(self, identifier) -> typing.Dict: priority_layer["user_defined"] = settings.value( "user_defined", defaultValue=True, type=bool ) + priority_layer["type"] = settings.value("type", defaultValue=0, type=int) priority_layer["groups"] = groups return priority_layer @@ -467,6 +656,9 @@ def get_priority_layers(self) -> typing.List: "user_defined": priority_settings.value( "user_defined", defaultValue=True, type=bool ), + "type": priority_settings.value( + "type", defaultValue=0, type=int + ), "groups": groups, } priority_layer_list.append(layer) @@ -543,6 +735,7 @@ def save_priority_layer(self, priority_layer): settings.setValue("path", priority_layer["path"]) settings.setValue("selected", priority_layer.get("selected", False)) settings.setValue("user_defined", priority_layer.get("user_defined", True)) + settings.setValue("type", priority_layer.get("type", 0)) groups_key = f"{settings_key}/groups" with qgis_settings(groups_key) as groups_settings: for group_id in groups_settings.childGroups(): @@ -550,7 +743,7 @@ def save_priority_layer(self, priority_layer): for group in groups: group_key = f"{groups_key}/{group['name']}" with qgis_settings(group_key) as group_settings: - group_settings.setValue("uuid", group.get("uuid")) + group_settings.setValue("uuid", str(group.get("uuid"))) group_settings.setValue("name", group["name"]) group_settings.setValue("value", group["value"]) @@ -712,6 +905,74 @@ def delete_priority_groups(self): for priority_group in settings.childGroups(): settings.remove(priority_group) + def _get_layer_mappings_settings_base(self) -> str: + """Returns the path for Layer Mapping settings. + + :returns: Base path to Layer Mapping group. + :rtype: str + """ + return f"{self.BASE_GROUP_NAME}/{self.LAYER_MAPPING_BASE}" + + def get_all_layer_mapping(self) -> typing.Dict: + """Return all layer mapping.""" + layer_mapping = {} + + layer_mapping_root = self._get_layer_mappings_settings_base() + with qgis_settings(layer_mapping_root) as settings: + keys = settings.childKeys() + for k in keys: + layer_raw = settings.value(k, dict()) + if len(layer_raw) > 0: + try: + layer = json.loads(layer_raw) + layer_mapping[k] = layer + except json.JSONDecodeError: + log("Layer Mapping JSON is invalid") + return layer_mapping + + def get_layer_mapping(self, identifier) -> typing.Dict: + """Retrieves the layer mapping that matches the passed identifier. + + :param identifier: Layer mapping identifier + :type identifier: str path + + :returns: Layer mapping + :rtype: typing.Dict + """ + + layer_mapping = {} + + layer_mapping_root = self._get_layer_mappings_settings_base() + + with qgis_settings(layer_mapping_root) as settings: + layer = settings.value(identifier, dict()) + if len(layer) > 0: + try: + layer_mapping = json.loads(layer) + except json.JSONDecodeError: + log("Layer Mapping JSON is invalid") + return layer_mapping + + def save_layer_mapping(self, input_layer, identifier=None): + """Save the layer mapping into the plugin settings + + :param input_layer: Layer mapping + :type input_layer: dict + :param identifier: file identifier using path + :type identifier: str + """ + + if not identifier: + identifier = input_layer["path"].replace(os.sep, "--") + settings_key = self._get_layer_mappings_settings_base() + + with qgis_settings(settings_key) as settings: + settings.setValue(identifier, json.dumps(input_layer)) + + def remove_layer_mapping(self, identifier): + """Remove layer mapping from settings.""" + self.remove(f"{self.LAYER_MAPPING_BASE}/{identifier}") + def _get_ncs_pathway_settings_base(self) -> str: """Returns the path for NCS pathway settings. @@ -907,7 +1168,7 @@ def save_activity(self, activity: typing.Union[Activity, dict]): if len(priority_layers) > 0: activity[PRIORITY_LAYERS_SEGMENT] = priority_layers - activity_str = json.dumps(activity) + activity_str = json.dumps(todict(activity), cls=CustomJsonEncoder) activity_uuid = activity[UUID_ATTRIBUTE] activity_root = self._get_activity_settings_base() @@ -1036,5 +1297,37 @@ def remove_activity(self, activity_uuid: str): if self.get_activity(activity_uuid) is not None: self.remove(f"{self.ACTIVITY_BASE}/{activity_uuid}") + def get_npv_collection(self) -> typing.Optional[ActivityNpvCollection]: + """Gets the collection of NPV mappings of activities. + + :returns: The collection of activity NPV mappings or None + if not defined. + :rtype: ActivityNpvCollection + """ + npv_collection_str = self.get_value(NPV_COLLECTION_PROPERTY, None) + if not npv_collection_str: + return None + + npv_collection_dict = {} + try: + npv_collection_dict = json.loads(npv_collection_str) + except json.JSONDecodeError: + log("ActivityNPVCollection JSON is invalid.") + + return create_activity_npv_collection( + npv_collection_dict, self.get_all_activities() + ) + + def save_npv_collection(self, npv_collection: ActivityNpvCollection): + """Saves the activity NPV collection in the settings as a serialized + JSON string. + + :param npv_collection: Activity NPV collection serialized to a JSON string. + :type npv_collection: ActivityNpvCollection + """ + npv_collection_dict = activity_npv_collection_to_dict(npv_collection) + npv_collection_str = json.dumps(npv_collection_dict) + self.set_value(NPV_COLLECTION_PROPERTY, npv_collection_str) + settings_manager = SettingsManager() diff --git a/django_project/cplus/utils/helper.py b/django_project/cplus/utils/helper.py index 8be01ed..d771988 100644 --- a/django_project/cplus/utils/helper.py +++ b/django_project/cplus/utils/helper.py @@ -6,7 +6,10 @@ import os import uuid +import json +import datetime from pathlib import Path +from uuid import UUID from qgis.PyQt import QtCore, QtGui from qgis.core import ( @@ -557,3 +560,60 @@ def align_rasters( ) return input_layer_output, None + + +class CustomJsonEncoder(json.JSONEncoder): + """ + Custom JSON encoder which handles UUID and datetime + """ + + def default(self, obj): + if isinstance(obj, UUID): + # if the obj is uuid, we simply return the value of uuid + return obj.hex + if isinstance(obj, datetime.datetime): + # if the obj is uuid, we simply return the value of uuid + return obj.isoformat() + return json.JSONEncoder.default(self, obj) + + +def todict(obj, classkey=None): + """ + Convert any object to dictionary + """ + + if isinstance(obj, dict): + data = {} + for k, v in obj.items(): + data[k] = todict(v, classkey) + return data + elif hasattr(obj, "_ast"): + return todict(obj._ast()) + elif hasattr(obj, "__iter__") and not isinstance(obj, str): + return [todict(v, classkey) for v in obj] + elif hasattr(obj, "__dict__"): + data = dict( + [ + (key, todict(value, classkey)) + for key, value in obj.__dict__.items() + if not callable(value) and not key.startswith("_") + ] + ) + if classkey is not None and hasattr(obj, "__class__"): + data[classkey] = obj.__class__.__name__ + return data + else: + return obj + + +def get_layer_type(file_path: str): + """ + Get layer type code from file path + """ + file_name, file_extension = os.path.splitext(file_path) + if file_extension.lower() in [".tif", ".tiff"]: + return 0 + elif file_extension.lower() in [".geojson", ".zip", ".shp"]: + return 1 + else: + return -1 diff --git a/django_project/cplus_api/migrations/0008_scenariotask_code_version.py b/django_project/cplus_api/migrations/0008_scenariotask_code_version.py new file mode 100644 index 0000000..0cc91b0 --- /dev/null +++ b/django_project/cplus_api/migrations/0008_scenariotask_code_version.py @@ -0,0 +1,18 @@ +# Generated by Django 4.2.7 on 2024-06-06 23:45 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('cplus_api', '0007_alter_inputlayer_component_type'), + ] + + operations = [ + migrations.AddField( + model_name='scenariotask', + name='code_version', + field=models.TextField(blank=True, default=''), + ), + ] diff --git a/django_project/cplus_api/models/scenario.py b/django_project/cplus_api/models/scenario.py index abb46c3..1e1a81b 100644 --- a/django_project/cplus_api/models/scenario.py +++ b/django_project/cplus_api/models/scenario.py @@ -31,6 +31,10 @@ class ScenarioTask(BaseTaskRequest): updated_detail = models.JSONField( default=dict ) + code_version = models.TextField( + default='', + blank=True + ) def task_on_sent(self, task_id, task_name, parameters): super().task_on_sent(task_id, task_name, parameters) diff --git a/django_project/cplus_api/tasks/runner.py b/django_project/cplus_api/tasks/runner.py index d0e4bf6..ce8c91f 100644 --- a/django_project/cplus_api/tasks/runner.py +++ b/django_project/cplus_api/tasks/runner.py @@ -4,6 +4,7 @@ import logging import time import json +from django.conf import settings from core.settings.utils import UUIDEncoder from cplus_api.models.scenario import ScenarioTask @@ -30,6 +31,10 @@ def create_scenario_task_runner(scenario_task: ScenarioTask): def run_scenario_analysis_task(scenario_task_id): # pragma: no cover scenario_task = ScenarioTask.objects.get(id=scenario_task_id) scenario_task.task_on_started() + scenario_task.code_version = ( + f"{settings.CODE_RELEASE_VERSION}-{settings.CODE_COMMIT_HASH}" + ) + scenario_task.save(update_fields=['code_version']) logger.info( f'Triggered run_scenario_analysis_task {str(scenario_task.uuid)}') from qgis.core import QgsApplication diff --git a/django_project/version/commit.txt b/django_project/version/commit.txt index c72f08c..b43dce5 100644 --- a/django_project/version/commit.txt +++ b/django_project/version/commit.txt @@ -1 +1 @@ -initial \ No newline at end of file +a7139ef4365e4db8bcdc9b5eba8d83944d546100