diff --git a/pygeoapi/api/processes.py b/pygeoapi/api/processes.py index 298fc5918..90570f844 100644 --- a/pygeoapi/api/processes.py +++ b/pygeoapi/api/processes.py @@ -484,7 +484,8 @@ def execute_process(api: API, request: APIRequest, process_id, data_dict, execution_mode=execution_mode, requested_outputs=requested_outputs, subscriber=subscriber, - requested_response=requested_response) + requested_response=requested_response, + request_headers=request.headers) job_id, mime_type, outputs, status, additional_headers = result headers.update(additional_headers or {}) diff --git a/pygeoapi/process/base.py b/pygeoapi/process/base.py index 87c05a3cd..ad865e615 100644 --- a/pygeoapi/process/base.py +++ b/pygeoapi/process/base.py @@ -53,6 +53,7 @@ def __init__(self, processor_def: dict, process_metadata: dict): self.name = processor_def['name'] self.metadata = process_metadata self.supports_outputs = False + self.supports_request_headers = False def set_job_id(self, job_id: str) -> None: """ @@ -70,7 +71,8 @@ def set_job_id(self, job_id: str) -> None: pass - def execute(self, data: dict, outputs: Optional[dict] = None + def execute(self, data: dict, outputs: Optional[dict] = None, + request_headers: Optional[dict] = None ) -> Tuple[str, Any]: """ execute the process @@ -81,6 +83,8 @@ def execute(self, data: dict, outputs: Optional[dict] = None required outputs - defaults to all outputs. The value of any key may be an object and include the property `transmissionMode` - defaults to `value`. + :param request_headers: `dict` optionally specifying the headers from + the request :returns: tuple of MIME type and process response (string or bytes, or dict) """ diff --git a/pygeoapi/process/manager/base.py b/pygeoapi/process/manager/base.py index e8c468421..20f3a2b01 100644 --- a/pygeoapi/process/manager/base.py +++ b/pygeoapi/process/manager/base.py @@ -76,6 +76,7 @@ def __init__(self, manager_def: dict): self.name = manager_def['name'] self.is_async = False self.supports_subscribing = False + self.supports_request_headers = False self.connection = manager_def.get('connection') self.output_dir = manager_def.get('output_dir') @@ -194,7 +195,8 @@ def _execute_handler_async(self, p: BaseProcessor, job_id: str, data_dict: dict, requested_outputs: Optional[dict] = None, subscriber: Optional[Subscriber] = None, - requested_response: Optional[RequestedResponse] = RequestedResponse.raw.value # noqa + requested_response: Optional[RequestedResponse] = RequestedResponse.raw.value, # noqa + request_headers: Optional[dict] = None ) -> Tuple[str, None, JobStatus]: """ This private execution handler executes a process in a background @@ -215,13 +217,15 @@ def _execute_handler_async(self, p: BaseProcessor, job_id: str, :param subscriber: optional `Subscriber` specifying callback URLs :param requested_response: `RequestedResponse` optionally specifying raw or document (default is `raw`) + :param request_headers: `dict` optionally specifying the headers from + the request :returns: tuple of None (i.e. initial response payload) and JobStatus.accepted (i.e. initial job status) """ args = (p, job_id, data_dict, requested_outputs, subscriber, - requested_response) + requested_response, request_headers) _process = dummy.Process(target=self._execute_handler_sync, args=args) _process.start() @@ -232,7 +236,8 @@ def _execute_handler_sync(self, p: BaseProcessor, job_id: str, data_dict: dict, requested_outputs: Optional[dict] = None, subscriber: Optional[Subscriber] = None, - requested_response: Optional[RequestedResponse] = RequestedResponse.raw.value # noqa + requested_response: Optional[RequestedResponse] = RequestedResponse.raw.value, # noqa + request_headers: Optional[dict] = None ) -> Tuple[str, Any, JobStatus]: """ Synchronous execution handler @@ -254,16 +259,20 @@ def _execute_handler_sync(self, p: BaseProcessor, job_id: str, :param subscriber: optional `Subscriber` specifying callback URLs :param requested_response: `RequestedResponse` optionally specifying raw or document (default is `raw`) + :param request_headers: `dict` optionally specifying the headers from + the request :returns: tuple of MIME type, response payload and status """ extra_execute_parameters = {} - # only pass requested_outputs if supported, + # only pass requested_outputs and request_headers if supported, # otherwise this breaks existing processes if p.supports_outputs: extra_execute_parameters['outputs'] = requested_outputs + if p.supports_request_headers: + extra_execute_parameters['request_headers'] = request_headers self._send_in_progress_notification(subscriber) @@ -358,7 +367,8 @@ def execute_process( execution_mode: Optional[RequestedProcessExecutionMode] = None, requested_outputs: Optional[dict] = None, subscriber: Optional[Subscriber] = None, - requested_response: Optional[RequestedResponse] = RequestedResponse.raw.value # noqa + requested_response: Optional[RequestedResponse] = RequestedResponse.raw.value, # noqa + request_headers: Optional[dict] = None ) -> Tuple[str, Any, JobStatus, Optional[Dict[str, str]]]: """ Default process execution handler @@ -377,6 +387,8 @@ def execute_process( :param subscriber: `Subscriber` optionally specifying callback urls :param requested_response: `RequestedResponse` optionally specifying raw or document (default is `raw`) + :param request_headers: `dict` optionally specifying the headers from + the request :raises UnknownProcessError: if the input process_id does not @@ -443,10 +455,12 @@ def execute_process( } self.add_job(job_metadata) - # only pass subscriber if supported, otherwise this breaks + # only pass subscriber and headers if supported, otherwise this breaks # existing managers if self.supports_subscribing: extra_execute_handler_parameters['subscriber'] = subscriber + if self.supports_request_headers: + extra_execute_handler_parameters['request_headers'] = request_headers # noqa # TODO: handler's response could also be allowed to include more HTTP # headers diff --git a/pygeoapi/process/manager/dummy.py b/pygeoapi/process/manager/dummy.py index 6528f53a7..f581ee7ce 100644 --- a/pygeoapi/process/manager/dummy.py +++ b/pygeoapi/process/manager/dummy.py @@ -79,7 +79,8 @@ def execute_process( execution_mode: Optional[RequestedProcessExecutionMode] = None, requested_outputs: Optional[dict] = None, subscriber: Optional[Subscriber] = None, - requested_response: Optional[RequestedResponse] = RequestedResponse.raw.value # noqa + requested_response: Optional[RequestedResponse] = RequestedResponse.raw.value, # noqa + request_headers: Optional[dict] = None ) -> Tuple[str, str, Any, JobStatus, Optional[Dict[str, str]]]: """ Default process execution handler @@ -95,6 +96,8 @@ def execute_process( :param subscriber: `Subscriber` optionally specifying callback urls :param requested_response: `RequestedResponse` optionally specifying raw or document (default is `raw`) + :param request_headers: `dict` optionally specifying the headers from + the request :raises UnknownProcessError: if the input process_id does not correspond to a known process diff --git a/pygeoapi/process/manager/mongodb_.py b/pygeoapi/process/manager/mongodb_.py index 44bce6dbe..02fa2117a 100644 --- a/pygeoapi/process/manager/mongodb_.py +++ b/pygeoapi/process/manager/mongodb_.py @@ -47,6 +47,7 @@ def __init__(self, manager_def): super().__init__(manager_def) self.is_async = True self.supports_subscribing = True + self.supports_request_headers = True def _connect(self): try: diff --git a/pygeoapi/process/manager/postgresql.py b/pygeoapi/process/manager/postgresql.py index 16d25ab8f..d8da1d6b0 100644 --- a/pygeoapi/process/manager/postgresql.py +++ b/pygeoapi/process/manager/postgresql.py @@ -79,6 +79,7 @@ def __init__(self, manager_def: dict): self.is_async = True self.id_field = 'identifier' self.supports_subscribing = True + self.supports_request_headers = True self.connection = manager_def['connection'] try: diff --git a/pygeoapi/process/manager/tinydb_.py b/pygeoapi/process/manager/tinydb_.py index b04d29a49..48b18af60 100644 --- a/pygeoapi/process/manager/tinydb_.py +++ b/pygeoapi/process/manager/tinydb_.py @@ -63,6 +63,7 @@ def __init__(self, manager_def: dict): super().__init__(manager_def) self.is_async = True self.supports_subscribing = True + self.supports_request_headers = True @contextmanager def _db(self):