From 285324d2200f83f8f85d575483d71fd3b2bf5b3d Mon Sep 17 00:00:00 2001 From: rldhont Date: Fri, 26 Jan 2024 15:29:06 +0100 Subject: [PATCH] [Bugfix] Legend ON/OFF - do not use requestReady To activate legend ON/OFF, we have to use access control layer permissions instead of request ready. Funded by 3liz --- lizmap_server/legend_onoff_filter.py | 180 ++++++++++++--------------- lizmap_server/plugin.py | 9 +- test/conftest.py | 26 ++-- test/test_legend_onoff.py | 161 ++++++++++++++++++++++++ 4 files changed, 265 insertions(+), 111 deletions(-) create mode 100644 test/test_legend_onoff.py diff --git a/lizmap_server/legend_onoff_filter.py b/lizmap_server/legend_onoff_filter.py index d37605e4..d6e7ff9c 100644 --- a/lizmap_server/legend_onoff_filter.py +++ b/lizmap_server/legend_onoff_filter.py @@ -5,13 +5,81 @@ # File adapted by @rldhont, 3Liz -from qgis.core import QgsMapLayerStyle, QgsProject -from qgis.server import QgsServerFilter, QgsServerInterface +from qgis.core import QgsMapLayer, QgsProject +from qgis.server import QgsServerFilter, QgsAccessControlFilter, QgsServerInterface from lizmap_server.core import find_vector_layer from lizmap_server.logger import Logger, exception_handler +class LegendOnOffAccessControl(QgsAccessControlFilter): + + def __init__(self, server_interface: QgsServerInterface): + super().__init__(server_interface) + + self.iface = server_interface + + def _setup_legend(self, layer, qs, onoff): + for legend_layer in qs.split(';'): + layer_name, key_list = legend_layer.split(':') + # not empty + if layer_name == '' or key_list == '': + continue + # for the layer + if layer_name != layer.shortName() and layer_name != layer.name() and layer_name != layer.id(): + continue + + for key in key_list.split(','): + layer.renderer().checkLegendSymbolItem(key, onoff) + + def layerPermissions(self, layer: QgsMapLayer) -> QgsAccessControlFilter.LayerPermissions: + rights = super().layerPermissions(layer) + + handler = self.iface.requestHandler() + params = handler.parameterMap() + + if 'LEGEND_ON' not in params and 'LEGEND_OFF' not in params: + return rights + + styles = params['STYLES'].split(',') if 'STYLES' in params and params['STYLES'] else [] + + if len(styles) == 0: + styles = [params['STYLE']] if 'STYLE' in params and params['STYLE'] else [] + + layers = params['LAYERS'].split(',') if 'LAYERS' in params and params['LAYERS'] else [] + + if len(layers) == 0: + layers = [params['LAYER']] if 'LAYER' in params and params['LAYER'] else [] + + # noinspection PyBroadException + try: + style_map = dict(zip(layers, styles)) + except Exception: + style_map = {} + + sm = layer.styleManager() + style = sm.currentStyle() + + # check short name + if layer.shortName() in style_map: + style = style_map[layer.name()] + # check layer name + elif layer.name() in style_map: + style = style_map[layer.name()] + # check layer id + elif layer.id() in style_map: + style = style_map[layer.name()] + + sm.setCurrentStyle(style) + + if 'LEGEND_ON' in params: + self._setup_legend(layer, params['LEGEND_ON'], True) + if 'LEGEND_OFF' in params: + self._setup_legend(layer, params['LEGEND_OFF'], False) + + return rights + + class LegendOnOffFilter(QgsServerFilter): """Legend ON/OFF filter @@ -22,11 +90,8 @@ class LegendOnOffFilter(QgsServerFilter): def __init__(self, server_interface: QgsServerInterface): super().__init__(server_interface) - self.style_map = None - self.renderers_config = None - - def _setup_legend(self, qs: str, onoff: bool, project: QgsProject): + def _reset_legend(self, qs: str, project: QgsProject): if not qs or ':' not in qs: return @@ -48,35 +113,12 @@ def _setup_legend(self, qs: str, onoff: bool, project: QgsProject): layer_name)) continue - try: - if layer_name not in self.renderers_config: - sm = layer.styleManager() - current_style = sm.currentStyle() - try: - style_name = self.style_map[layer_name] - except KeyError: - style_name = current_style - xml = sm.style(style_name).xmlData() - sm.setCurrentStyle(style_name) - self.renderers_config[layer_name] = { - 'current_style': current_style, - 'xml': xml, - 'style_name': style_name, - } - - for key in key_list.split(','): - layer.renderer().checkLegendSymbolItem(key, onoff) - - except Exception as ex: - logger.warning( - 'Error setting legend {} for layer "{}" when configuring OWS call: {}'.format( - 'ON' if onoff else 'OFF', layer_name, ex)) - continue + for key in keys: + layer.renderer().checkLegendSymbolItem(key, True) @exception_handler - def requestReady(self): - - self.renderers_config = {} + def responseComplete(self): + """Restore legend customized renderers""" handler = self.serverInterface().requestHandler() logger = Logger() @@ -89,78 +131,10 @@ def requestReady(self): if 'LEGEND_ON' not in params and 'LEGEND_OFF' not in params: return - styles = params['STYLES'].split(',') if 'STYLES' in params and params['STYLES'] else [] - - if len(styles) == 0: - styles = [params['STYLE']] if 'STYLE' in params and params['STYLE'] else [] - - layers = params['LAYERS'].split(',') if 'LAYERS' in params and params['LAYERS'] else [] - - if len(layers) == 0: - layers = [params['LAYER']] if 'LAYER' in params and params['LAYER'] else [] - - # noinspection PyBroadException - try: - self.style_map = dict(zip(layers, styles)) - except Exception: - self.style_map = {} - # noinspection PyArgumentList project: QgsProject = QgsProject.instance() - logger.warning( - 'LegendOnOFF::requestReady : project instance : {} \n against MAP = {}'.format( - project.fileName(), - params.get('MAP', ''), - ) - ) if 'LEGEND_ON' in params: - self._setup_legend(params['LEGEND_ON'], True, project) + self._reset_legend(params['LEGEND_ON'], project) if 'LEGEND_OFF' in params: - self._setup_legend(params['LEGEND_OFF'], False, project) - - @exception_handler - def responseComplete(self): - """Restore legend customized renderers""" - - handler = self.serverInterface().requestHandler() - logger = Logger() - if not handler: - logger.critical( - 'LegendOnOffFilter plugin cannot be run in multithreading mode, skipping.') - return - - if len(self.renderers_config) == 0: - return - - params = handler.parameterMap() - - # noinspection PyArgumentList - project: QgsProject = QgsProject.instance() - logger.warning( - 'LegendOnOFF::responseComplete : project instance : {} \n against MAP = {}'.format( - project.fileName(), - params.get('MAP', '') - )) - - for layer_name, renderer_config in self.renderers_config.items(): - layer = find_vector_layer(layer_name, project) - if not layer: - logger.warning( - "ResponseComplete : Skipping the layer '{}' because it's not a vector layer".format(layer_name)) - continue - - try: - config = self.renderers_config[layer_name] - - sm = layer.styleManager() - sm.renameStyle(config['style_name'], 'dirty_to_remove') - sm.addStyle(config['style_name'], QgsMapLayerStyle(config['xml'])) - sm.setCurrentStyle(config['current_style']) - sm.removeStyle('dirty_to_remove') - - except Exception as ex: - logger.warning( - 'Error restoring renderer after legend ON/OFF for layer "{}" when configuring OWS call: {}'.format( - layer_name, ex)) - continue + self._reset_legend(params['LEGEND_OFF'], project) diff --git a/lizmap_server/plugin.py b/lizmap_server/plugin.py index 739ab715..81b85fd0 100755 --- a/lizmap_server/plugin.py +++ b/lizmap_server/plugin.py @@ -7,7 +7,7 @@ from lizmap_server.expression_service import ExpressionService from lizmap_server.get_feature_info import GetFeatureInfoFilter from lizmap_server.get_legend_graphic import GetLegendGraphicFilter -from lizmap_server.legend_onoff_filter import LegendOnOffFilter +from lizmap_server.legend_onoff_filter import LegendOnOffAccessControl, LegendOnOffFilter from lizmap_server.lizmap_accesscontrol import LizmapAccessControlFilter from lizmap_server.lizmap_filter import LizmapFilter from lizmap_server.lizmap_service import LizmapService @@ -90,3 +90,10 @@ def __init__(self, server_iface: QgsServerInterface) -> None: self.logger.critical('Error loading filter "legend on/off" : {}'.format(e)) raise self.logger.info('Filter "legend on/off" loaded') + + try: + server_iface.registerAccessControl(LegendOnOffAccessControl(self.server_iface), 175) + except Exception as e: + self.logger.critical('Error loading access control "legend on/off" : {}'.format(e)) + raise + self.logger.info('Access control "legend on/off" loaded') diff --git a/test/conftest.py b/test/conftest.py index 84350ace..c7cd7aee 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -150,24 +150,36 @@ def getplugin(self, name) -> Any: def getprojectpath(self, name: str) -> str: return self.datapath.join(name) + def getproject(self, name: str) -> QgsProject: + projectpath = self.getprojectpath(name) + if Qgis.QGIS_VERSION_INT >= 32601: + qgsproject = QgsProject(capabilities=Qgis.ProjectCapabilities()) + else: + qgsproject = QgsProject() + if not qgsproject.read(projectpath.strpath): + raise ValueError("Error reading project '%s':" % projectpath.strpath) + return qgsproject + def get(self, query: str, project: str=None, headers: Dict[str, str]={}) -> OWSResponse: """ Return server response from query """ request = QgsBufferServerRequest(query, QgsServerRequest.GetMethod, headers, None) response = QgsBufferServerResponse() if project is not None and not os.path.isabs(project): - projectpath = self.datapath.join(project) - if Qgis.QGIS_VERSION_INT >= 32601: - qgsproject = QgsProject(capabilities=Qgis.ProjectCapabilities()) - else: - qgsproject = QgsProject() - if not qgsproject.read(projectpath.strpath): - raise ValueError("Error reading project '%s':" % projectpath.strpath) + qgsproject = self.getproject(project) else: qgsproject = None self.server.handleRequest(request, response, project=qgsproject) return OWSResponse(response) + def getWithProject(self, query: str, project: QgsProject, headers: Dict[str, str]={}) -> OWSResponse: + """ Return server response from query + """ + request = QgsBufferServerRequest(query, QgsServerRequest.GetMethod, headers, None) + response = QgsBufferServerResponse() + self.server.handleRequest(request, response, project=project) + return OWSResponse(response) + return _Client() diff --git a/test/test_legend_onoff.py b/test/test_legend_onoff.py new file mode 100644 index 00000000..9070efce --- /dev/null +++ b/test/test_legend_onoff.py @@ -0,0 +1,161 @@ +import io +import logging + +from PIL import Image + +from test.utils import _build_query_string + +LOGGER = logging.getLogger('server') + +__copyright__ = 'Copyright 2023, 3Liz' +__license__ = 'GPL version 3' +__email__ = 'info@3liz.org' + +PROJECT = "legend.qgs" + +BASE_QUERY = { + 'MAP': PROJECT, + 'SERVICE': 'WMS', + 'REQUEST': 'GetMap', + 'VERSION': '1.3.0', + 'EXCEPTIONS': 'application%2Fvnd.ogc.se_inimage', + 'FORMAT': 'image%2Fpng', + 'DPI': '96', + 'TRANSPARENT': 'TRUE', + 'CRS': 'EPSG%3A4326', + 'BBOX': '47.51884820641368,-1.660771898670815,48.49033140295455,-0.35117445235345235', + 'WIDTH': '550', + 'HEIGHT': '408', +} + + +def test_categorized_symbol(client): + """ Test categorized symbol for layer. """ + project = client.getproject(PROJECT) + + qs = dict(BASE_QUERY) + qs['LAYERS'] = 'categorized' + rv = client.getWithProject(_build_query_string(qs), project) + assert rv.status_code == 200 + assert rv.headers.get('Content-Type', '').find('image/png') == 0 + + img = Image.open(io.BytesIO(rv.content)) + # save image for debugging + # img.save(client.datapath.join('legend_categorized-1.png').strpath) + assert img.format == 'PNG' + assert img.width == 550 + assert img.height == 408 + # remove transparency to reduce the number of colors + img = img.convert('RGB') + colors = img.getcolors(1024) + assert colors is not None + colors.sort(key=lambda color: color[0], reverse=True) + default_color_numbers = len(colors) + + qs = dict(BASE_QUERY) + qs['LAYERS'] = 'categorized' + qs['LEGEND_ON'] = 'categorized:1' + qs['LEGEND_OFF'] = 'categorized:0,2,3,4' + rv = client.getWithProject(_build_query_string(qs), project) + assert rv.status_code == 200 + assert rv.headers.get('Content-Type', '').find('image/png') == 0 + + img = Image.open(io.BytesIO(rv.content)) + # save image for debugging + # img.save(client.datapath.join('legend_categorized_onoff.png').strpath) + assert img.format == 'PNG' + assert img.width == 550 + assert img.height == 408 + # remove transparency to reduce the number of colors + img = img.convert('RGB') + colors = img.getcolors(1024) + assert colors is not None + colors.sort(key=lambda color: color[0], reverse=True) + # less colors because 1 feature displayed + assert len(colors) < default_color_numbers, f'not {len(colors)} < {default_color_numbers}' + + qs = dict(BASE_QUERY) + qs['LAYERS'] = 'categorized' + rv = client.getWithProject(_build_query_string(qs), project) + assert rv.status_code == 200 + assert rv.headers.get('Content-Type', '').find('image/png') == 0 + + img = Image.open(io.BytesIO(rv.content)) + # save image for debugging + # img.save(client.datapath.join('legend_categorized-2.png').strpath) + assert img.format == 'PNG' + assert img.width == 550 + assert img.height == 408 + # remove transparency to reduce the number of colors + img = img.convert('RGB') + colors = img.getcolors(1024) + assert colors is not None + colors.sort(key=lambda color: color[0], reverse=True) + # same colors as the first request - the legend has been well reset + assert len(colors) == default_color_numbers, f'{len(colors)} != {default_color_numbers}' + + +def test_simple_rule_based(client): + """ Test rule based layer, simple conversion from categorized. """ + project = client.getproject(PROJECT) + + qs = dict(BASE_QUERY) + qs['LAYERS'] = 'rule_based' + rv = client.getWithProject(_build_query_string(qs), project) + assert rv.status_code == 200 + assert rv.headers.get('Content-Type', '').find('image/png') == 0 + + img = Image.open(io.BytesIO(rv.content)) + # save image for debugging + # img.save(client.datapath.join('legend_rule_based-1.png').strpath) + assert img.format == 'PNG' + assert img.width == 550 + assert img.height == 408 + # remove transparency to reduce the number of colors + img = img.convert('RGB') + colors = img.getcolors(1024) + assert colors is not None + colors.sort(key=lambda color: color[0], reverse=True) + default_color_numbers = len(colors) + + qs = dict(BASE_QUERY) + qs['LAYERS'] = 'rule_based' + qs['LEGEND_ON'] = 'rule_based:{49db22fd-3aed-495d-9140-4b82f50fdcfd}' + qs['LEGEND_OFF'] = 'rule_based:{1e75ef9b-1c18-46c1-b7f7-b16efc5bb791},{37b9b766-5309-4617-b0a4-1122168cbfd0},{bd0ace82-eee5-46c3-ad70-f8ecb7d50bb3},{77b34ffc-2198-4450-8e4d-270df282a81b}' + rv = client.getWithProject(_build_query_string(qs), project) + assert rv.status_code == 200 + assert rv.headers.get('Content-Type', '').find('image/png') == 0 + + img = Image.open(io.BytesIO(rv.content)) + # save image for debugging + # img.save(client.datapath.join('legend_rule_based_onoff.png').strpath) + assert img.format == 'PNG' + assert img.width == 550 + assert img.height == 408 + # remove transparency to reduce the number of colors + img = img.convert('RGB') + colors = img.getcolors(1024) + assert colors is not None + colors.sort(key=lambda color: color[0], reverse=True) + # less colors because 1 feature displayed + assert len(colors) < default_color_numbers, f'not {len(colors)} < {default_color_numbers}' + + qs = dict(BASE_QUERY) + qs['LAYERS'] = 'categorized' + rv = client.getWithProject(_build_query_string(qs), project) + assert rv.status_code == 200 + assert rv.headers.get('Content-Type', '').find('image/png') == 0 + + img = Image.open(io.BytesIO(rv.content)) + # save image for debugging + # img.save(client.datapath.join('legend_rule_based-2.png').strpath) + assert img.format == 'PNG' + assert img.width == 550 + assert img.height == 408 + # remove transparency to reduce the number of colors + img = img.convert('RGB') + colors = img.getcolors(1024) + assert colors is not None + colors.sort(key=lambda color: color[0], reverse=True) + # same colors as the first request - the legend has been well reset + assert len(colors) == default_color_numbers, f'{len(colors)} != {default_color_numbers}'