Skip to content

Commit

Permalink
[Bugfix] Legend ON/OFF - do not use requestReady
Browse files Browse the repository at this point in the history
To activate legend ON/OFF, we have to use access control layer permissions instead of request ready.

Funded by 3liz
  • Loading branch information
rldhont committed Jan 26, 2024
1 parent 7864453 commit 285324d
Show file tree
Hide file tree
Showing 4 changed files with 265 additions and 111 deletions.
180 changes: 77 additions & 103 deletions lizmap_server/legend_onoff_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,81 @@

# File adapted by @rldhont, 3Liz

from qgis.core import QgsMapLayerStyle, QgsProject
from qgis.server import QgsServerFilter, QgsServerInterface
from qgis.core import QgsMapLayer, QgsProject
from qgis.server import QgsServerFilter, QgsAccessControlFilter, QgsServerInterface

from lizmap_server.core import find_vector_layer
from lizmap_server.logger import Logger, exception_handler


class LegendOnOffAccessControl(QgsAccessControlFilter):

def __init__(self, server_interface: QgsServerInterface):
super().__init__(server_interface)

self.iface = server_interface

def _setup_legend(self, layer, qs, onoff):
for legend_layer in qs.split(';'):
layer_name, key_list = legend_layer.split(':')
# not empty
if layer_name == '' or key_list == '':
continue
# for the layer
if layer_name != layer.shortName() and layer_name != layer.name() and layer_name != layer.id():
continue

for key in key_list.split(','):
layer.renderer().checkLegendSymbolItem(key, onoff)

def layerPermissions(self, layer: QgsMapLayer) -> QgsAccessControlFilter.LayerPermissions:
rights = super().layerPermissions(layer)

handler = self.iface.requestHandler()
params = handler.parameterMap()

if 'LEGEND_ON' not in params and 'LEGEND_OFF' not in params:
return rights

styles = params['STYLES'].split(',') if 'STYLES' in params and params['STYLES'] else []

if len(styles) == 0:
styles = [params['STYLE']] if 'STYLE' in params and params['STYLE'] else []

layers = params['LAYERS'].split(',') if 'LAYERS' in params and params['LAYERS'] else []

if len(layers) == 0:
layers = [params['LAYER']] if 'LAYER' in params and params['LAYER'] else []

# noinspection PyBroadException
try:
style_map = dict(zip(layers, styles))
except Exception:
style_map = {}

sm = layer.styleManager()
style = sm.currentStyle()

# check short name
if layer.shortName() in style_map:
style = style_map[layer.name()]
# check layer name
elif layer.name() in style_map:
style = style_map[layer.name()]
# check layer id
elif layer.id() in style_map:
style = style_map[layer.name()]

sm.setCurrentStyle(style)

if 'LEGEND_ON' in params:
self._setup_legend(layer, params['LEGEND_ON'], True)
if 'LEGEND_OFF' in params:
self._setup_legend(layer, params['LEGEND_OFF'], False)

return rights


class LegendOnOffFilter(QgsServerFilter):
"""Legend ON/OFF filter
Expand All @@ -22,11 +90,8 @@ class LegendOnOffFilter(QgsServerFilter):

def __init__(self, server_interface: QgsServerInterface):
super().__init__(server_interface)
self.style_map = None
self.renderers_config = None

def _setup_legend(self, qs: str, onoff: bool, project: QgsProject):

def _reset_legend(self, qs: str, project: QgsProject):
if not qs or ':' not in qs:
return

Expand All @@ -48,35 +113,12 @@ def _setup_legend(self, qs: str, onoff: bool, project: QgsProject):
layer_name))
continue

try:
if layer_name not in self.renderers_config:
sm = layer.styleManager()
current_style = sm.currentStyle()
try:
style_name = self.style_map[layer_name]
except KeyError:
style_name = current_style
xml = sm.style(style_name).xmlData()
sm.setCurrentStyle(style_name)
self.renderers_config[layer_name] = {
'current_style': current_style,
'xml': xml,
'style_name': style_name,
}

for key in key_list.split(','):
layer.renderer().checkLegendSymbolItem(key, onoff)

except Exception as ex:
logger.warning(
'Error setting legend {} for layer "{}" when configuring OWS call: {}'.format(
'ON' if onoff else 'OFF', layer_name, ex))
continue
for key in keys:
layer.renderer().checkLegendSymbolItem(key, True)

@exception_handler
def requestReady(self):

self.renderers_config = {}
def responseComplete(self):
"""Restore legend customized renderers"""

handler = self.serverInterface().requestHandler()
logger = Logger()
Expand All @@ -89,78 +131,10 @@ def requestReady(self):
if 'LEGEND_ON' not in params and 'LEGEND_OFF' not in params:
return

styles = params['STYLES'].split(',') if 'STYLES' in params and params['STYLES'] else []

if len(styles) == 0:
styles = [params['STYLE']] if 'STYLE' in params and params['STYLE'] else []

layers = params['LAYERS'].split(',') if 'LAYERS' in params and params['LAYERS'] else []

if len(layers) == 0:
layers = [params['LAYER']] if 'LAYER' in params and params['LAYER'] else []

# noinspection PyBroadException
try:
self.style_map = dict(zip(layers, styles))
except Exception:
self.style_map = {}

# noinspection PyArgumentList
project: QgsProject = QgsProject.instance()
logger.warning(
'LegendOnOFF::requestReady : project instance : {} \n against MAP = {}'.format(
project.fileName(),
params.get('MAP', ''),
)
)

if 'LEGEND_ON' in params:
self._setup_legend(params['LEGEND_ON'], True, project)
self._reset_legend(params['LEGEND_ON'], project)
if 'LEGEND_OFF' in params:
self._setup_legend(params['LEGEND_OFF'], False, project)

@exception_handler
def responseComplete(self):
"""Restore legend customized renderers"""

handler = self.serverInterface().requestHandler()
logger = Logger()
if not handler:
logger.critical(
'LegendOnOffFilter plugin cannot be run in multithreading mode, skipping.')
return

if len(self.renderers_config) == 0:
return

params = handler.parameterMap()

# noinspection PyArgumentList
project: QgsProject = QgsProject.instance()
logger.warning(
'LegendOnOFF::responseComplete : project instance : {} \n against MAP = {}'.format(
project.fileName(),
params.get('MAP', '')
))

for layer_name, renderer_config in self.renderers_config.items():
layer = find_vector_layer(layer_name, project)
if not layer:
logger.warning(
"ResponseComplete : Skipping the layer '{}' because it's not a vector layer".format(layer_name))
continue

try:
config = self.renderers_config[layer_name]

sm = layer.styleManager()
sm.renameStyle(config['style_name'], 'dirty_to_remove')
sm.addStyle(config['style_name'], QgsMapLayerStyle(config['xml']))
sm.setCurrentStyle(config['current_style'])
sm.removeStyle('dirty_to_remove')

except Exception as ex:
logger.warning(
'Error restoring renderer after legend ON/OFF for layer "{}" when configuring OWS call: {}'.format(
layer_name, ex))
continue
self._reset_legend(params['LEGEND_OFF'], project)
9 changes: 8 additions & 1 deletion lizmap_server/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from lizmap_server.expression_service import ExpressionService
from lizmap_server.get_feature_info import GetFeatureInfoFilter
from lizmap_server.get_legend_graphic import GetLegendGraphicFilter
from lizmap_server.legend_onoff_filter import LegendOnOffFilter
from lizmap_server.legend_onoff_filter import LegendOnOffAccessControl, LegendOnOffFilter
from lizmap_server.lizmap_accesscontrol import LizmapAccessControlFilter
from lizmap_server.lizmap_filter import LizmapFilter
from lizmap_server.lizmap_service import LizmapService
Expand Down Expand Up @@ -90,3 +90,10 @@ def __init__(self, server_iface: QgsServerInterface) -> None:
self.logger.critical('Error loading filter "legend on/off" : {}'.format(e))
raise
self.logger.info('Filter "legend on/off" loaded')

try:
server_iface.registerAccessControl(LegendOnOffAccessControl(self.server_iface), 175)
except Exception as e:
self.logger.critical('Error loading access control "legend on/off" : {}'.format(e))
raise
self.logger.info('Access control "legend on/off" loaded')
26 changes: 19 additions & 7 deletions test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,24 +150,36 @@ def getplugin(self, name) -> Any:
def getprojectpath(self, name: str) -> str:
return self.datapath.join(name)

def getproject(self, name: str) -> QgsProject:
projectpath = self.getprojectpath(name)
if Qgis.QGIS_VERSION_INT >= 32601:
qgsproject = QgsProject(capabilities=Qgis.ProjectCapabilities())
else:
qgsproject = QgsProject()
if not qgsproject.read(projectpath.strpath):
raise ValueError("Error reading project '%s':" % projectpath.strpath)
return qgsproject

def get(self, query: str, project: str=None, headers: Dict[str, str]={}) -> OWSResponse:
""" Return server response from query
"""
request = QgsBufferServerRequest(query, QgsServerRequest.GetMethod, headers, None)
response = QgsBufferServerResponse()
if project is not None and not os.path.isabs(project):
projectpath = self.datapath.join(project)
if Qgis.QGIS_VERSION_INT >= 32601:
qgsproject = QgsProject(capabilities=Qgis.ProjectCapabilities())
else:
qgsproject = QgsProject()
if not qgsproject.read(projectpath.strpath):
raise ValueError("Error reading project '%s':" % projectpath.strpath)
qgsproject = self.getproject(project)
else:
qgsproject = None
self.server.handleRequest(request, response, project=qgsproject)
return OWSResponse(response)

def getWithProject(self, query: str, project: QgsProject, headers: Dict[str, str]={}) -> OWSResponse:
""" Return server response from query
"""
request = QgsBufferServerRequest(query, QgsServerRequest.GetMethod, headers, None)
response = QgsBufferServerResponse()
self.server.handleRequest(request, response, project=project)
return OWSResponse(response)

return _Client()


Expand Down
Loading

0 comments on commit 285324d

Please sign in to comment.