Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optionnally provide request HTTP headers to processors #1899

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pygeoapi/api/processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,8 @@ def execute_process(api: API, request: APIRequest,
process_id, data_dict, execution_mode=execution_mode,
requested_outputs=requested_outputs,
subscriber=subscriber,
requested_response=requested_response)
requested_response=requested_response,
request_headers=request.headers)
job_id, mime_type, outputs, status, additional_headers = result
headers.update(additional_headers or {})

Expand Down
6 changes: 5 additions & 1 deletion pygeoapi/process/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def __init__(self, processor_def: dict, process_metadata: dict):
self.name = processor_def['name']
self.metadata = process_metadata
self.supports_outputs = False
self.supports_request_headers = False

def set_job_id(self, job_id: str) -> None:
"""
Expand All @@ -70,7 +71,8 @@ def set_job_id(self, job_id: str) -> None:

pass

def execute(self, data: dict, outputs: Optional[dict] = None
def execute(self, data: dict, outputs: Optional[dict] = None,
request_headers: Optional[dict] = None
) -> Tuple[str, Any]:
"""
execute the process
Expand All @@ -81,6 +83,8 @@ def execute(self, data: dict, outputs: Optional[dict] = None
required outputs - defaults to all outputs.
The value of any key may be an object and include the
property `transmissionMode` - defaults to `value`.
:param request_headers: `dict` optionally specifying the headers from
the request
:returns: tuple of MIME type and process response
(string or bytes, or dict)
"""
Expand Down
26 changes: 20 additions & 6 deletions pygeoapi/process/manager/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def __init__(self, manager_def: dict):
self.name = manager_def['name']
self.is_async = False
self.supports_subscribing = False
self.supports_request_headers = False
self.connection = manager_def.get('connection')
self.output_dir = manager_def.get('output_dir')

Expand Down Expand Up @@ -194,7 +195,8 @@ def _execute_handler_async(self, p: BaseProcessor, job_id: str,
data_dict: dict,
requested_outputs: Optional[dict] = None,
subscriber: Optional[Subscriber] = None,
requested_response: Optional[RequestedResponse] = RequestedResponse.raw.value # noqa
requested_response: Optional[RequestedResponse] = RequestedResponse.raw.value, # noqa
request_headers: Optional[dict] = None
) -> Tuple[str, None, JobStatus]:
"""
This private execution handler executes a process in a background
Expand All @@ -215,13 +217,15 @@ def _execute_handler_async(self, p: BaseProcessor, job_id: str,
:param subscriber: optional `Subscriber` specifying callback URLs
:param requested_response: `RequestedResponse` optionally specifying
raw or document (default is `raw`)
:param request_headers: `dict` optionally specifying the headers from
the request

:returns: tuple of None (i.e. initial response payload)
and JobStatus.accepted (i.e. initial job status)
"""

args = (p, job_id, data_dict, requested_outputs, subscriber,
requested_response)
requested_response, request_headers)

_process = dummy.Process(target=self._execute_handler_sync, args=args)
_process.start()
Expand All @@ -232,7 +236,8 @@ def _execute_handler_sync(self, p: BaseProcessor, job_id: str,
data_dict: dict,
requested_outputs: Optional[dict] = None,
subscriber: Optional[Subscriber] = None,
requested_response: Optional[RequestedResponse] = RequestedResponse.raw.value # noqa
requested_response: Optional[RequestedResponse] = RequestedResponse.raw.value, # noqa
request_headers: Optional[dict] = None
) -> Tuple[str, Any, JobStatus]:
"""
Synchronous execution handler
Expand All @@ -254,16 +259,20 @@ def _execute_handler_sync(self, p: BaseProcessor, job_id: str,
:param subscriber: optional `Subscriber` specifying callback URLs
:param requested_response: `RequestedResponse` optionally specifying
raw or document (default is `raw`)
:param request_headers: `dict` optionally specifying the headers from
the request

:returns: tuple of MIME type, response payload and status
"""

extra_execute_parameters = {}

# only pass requested_outputs if supported,
# only pass requested_outputs and request_headers if supported,
# otherwise this breaks existing processes
if p.supports_outputs:
extra_execute_parameters['outputs'] = requested_outputs
if p.supports_request_headers:
extra_execute_parameters['request_headers'] = request_headers

self._send_in_progress_notification(subscriber)

Expand Down Expand Up @@ -358,7 +367,8 @@ def execute_process(
execution_mode: Optional[RequestedProcessExecutionMode] = None,
requested_outputs: Optional[dict] = None,
subscriber: Optional[Subscriber] = None,
requested_response: Optional[RequestedResponse] = RequestedResponse.raw.value # noqa
requested_response: Optional[RequestedResponse] = RequestedResponse.raw.value, # noqa
request_headers: Optional[dict] = None
) -> Tuple[str, Any, JobStatus, Optional[Dict[str, str]]]:
"""
Default process execution handler
Expand All @@ -377,6 +387,8 @@ def execute_process(
:param subscriber: `Subscriber` optionally specifying callback urls
:param requested_response: `RequestedResponse` optionally specifying
raw or document (default is `raw`)
:param request_headers: `dict` optionally specifying the headers from
the request


:raises UnknownProcessError: if the input process_id does not
Expand Down Expand Up @@ -443,10 +455,12 @@ def execute_process(
}
self.add_job(job_metadata)

# only pass subscriber if supported, otherwise this breaks
# only pass subscriber and headers if supported, otherwise this breaks
# existing managers
if self.supports_subscribing:
extra_execute_handler_parameters['subscriber'] = subscriber
if self.supports_request_headers:
extra_execute_handler_parameters['request_headers'] = request_headers # noqa

# TODO: handler's response could also be allowed to include more HTTP
# headers
Expand Down
5 changes: 4 additions & 1 deletion pygeoapi/process/manager/dummy.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ def execute_process(
execution_mode: Optional[RequestedProcessExecutionMode] = None,
requested_outputs: Optional[dict] = None,
subscriber: Optional[Subscriber] = None,
requested_response: Optional[RequestedResponse] = RequestedResponse.raw.value # noqa
requested_response: Optional[RequestedResponse] = RequestedResponse.raw.value, # noqa
request_headers: Optional[dict] = None
) -> Tuple[str, str, Any, JobStatus, Optional[Dict[str, str]]]:
"""
Default process execution handler
Expand All @@ -95,6 +96,8 @@ def execute_process(
:param subscriber: `Subscriber` optionally specifying callback urls
:param requested_response: `RequestedResponse` optionally specifying
raw or document (default is `raw`)
:param request_headers: `dict` optionally specifying the headers from
the request

:raises UnknownProcessError: if the input process_id does not
correspond to a known process
Expand Down
1 change: 1 addition & 0 deletions pygeoapi/process/manager/mongodb_.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def __init__(self, manager_def):
super().__init__(manager_def)
self.is_async = True
self.supports_subscribing = True
self.supports_request_headers = True

def _connect(self):
try:
Expand Down
1 change: 1 addition & 0 deletions pygeoapi/process/manager/postgresql.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def __init__(self, manager_def: dict):
self.is_async = True
self.id_field = 'identifier'
self.supports_subscribing = True
self.supports_request_headers = True
self.connection = manager_def['connection']

try:
Expand Down
1 change: 1 addition & 0 deletions pygeoapi/process/manager/tinydb_.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def __init__(self, manager_def: dict):
super().__init__(manager_def)
self.is_async = True
self.supports_subscribing = True
self.supports_request_headers = True

@contextmanager
def _db(self):
Expand Down