|
1 | 1 | import asyncio
|
2 | 2 | import logging
|
3 | 3 | import time
|
4 |
| -from typing import Awaitable, Callable |
| 4 | +from functools import wraps |
| 5 | +from typing import Any, Callable |
5 | 6 |
|
6 |
| -from fastapi import HTTPException, Request, Response |
| 7 | +from fastapi import HTTPException, Request |
7 | 8 | from fastapi.applications import FastAPI
|
8 |
| -from starlette.middleware.base import BaseHTTPMiddleware |
9 |
| -from starlette.responses import PlainTextResponse |
| 9 | +from fastapi.dependencies.utils import ( |
| 10 | + get_body_field, |
| 11 | + get_dependant, |
| 12 | + get_parameterless_sub_dependant, |
| 13 | +) |
| 14 | +from fastapi.responses import PlainTextResponse |
| 15 | +from fastapi.routing import APIRoute, request_response |
10 | 16 | from starlette.status import HTTP_504_GATEWAY_TIMEOUT
|
11 |
| -from starlette.types import Message |
12 | 17 |
|
13 | 18 | from pccommon.logging import get_custom_dimensions
|
14 |
| -from pccommon.tracing import trace_request |
15 | 19 |
|
16 | 20 | logger = logging.getLogger(__name__)
|
17 | 21 |
|
18 | 22 |
|
19 |
| -async def handle_exceptions( |
20 |
| - request: Request, |
21 |
| - call_next: Callable[[Request], Awaitable[Response]], |
22 |
| -) -> Response: |
23 |
| - try: |
24 |
| - return await call_next(request) |
25 |
| - except HTTPException: |
| 23 | +async def http_exception_handler(request: Request, exc: Exception) -> Any: |
| 24 | + # Log the exception with additional request info if needed |
| 25 | + logger.exception("Exception when handling request", exc_info=exc) |
| 26 | + # Return a custom response for HTTPException |
| 27 | + if isinstance(exc, HTTPException): |
26 | 28 | raise
|
27 |
| - except Exception as e: |
| 29 | + # Handle other exceptions, possibly with a generic response |
| 30 | + else: |
28 | 31 | logger.exception(
|
29 | 32 | "Exception when handling request",
|
30 |
| - extra=get_custom_dimensions({"stackTrace": f"{e}"}, request), |
| 33 | + extra=get_custom_dimensions({"stackTrace": f"{exc}"}, request), |
31 | 34 | )
|
32 | 35 | raise
|
33 | 36 |
|
34 | 37 |
|
35 |
| -class RequestTracingMiddleware(BaseHTTPMiddleware): |
36 |
| - """Custom middleware to use opencensus request traces |
37 |
| -
|
38 |
| - Middleware implementations that access a Request object directly |
39 |
| - will cause subsequent middleware or route handlers to hang. See |
40 |
| -
|
41 |
| - https://github.com/tiangolo/fastapi/issues/394 |
42 |
| -
|
43 |
| - for more details on this implementation. |
44 |
| -
|
45 |
| - An alternative approach is to use dependencies on the APIRouter, but |
46 |
| - the stac-fast api implementation makes that difficult without having |
47 |
| - to override much of the app initialization. |
48 |
| - """ |
49 |
| - |
50 |
| - def __init__(self, app: FastAPI, service_name: str): |
51 |
| - super().__init__(app) |
52 |
| - self.service_name = service_name |
53 |
| - |
54 |
| - async def set_body(self, request: Request) -> None: |
55 |
| - receive_ = await request._receive() |
56 |
| - |
57 |
| - async def receive() -> Message: |
58 |
| - return receive_ |
59 |
| - |
60 |
| - request._receive = receive |
61 |
| - |
62 |
| - async def dispatch( |
63 |
| - self, request: Request, call_next: Callable[[Request], Awaitable[Response]] |
64 |
| - ) -> Response: |
65 |
| - await self.set_body(request) |
66 |
| - response = await trace_request(self.service_name, request, call_next) |
67 |
| - return response |
68 |
| - |
69 |
| - |
70 |
| -async def timeout_middleware( |
71 |
| - request: Request, |
72 |
| - call_next: Callable[[Request], Awaitable[Response]], |
73 |
| - timeout: int, |
74 |
| -) -> Response: |
75 |
| - try: |
76 |
| - start_time = time.time() |
77 |
| - return await asyncio.wait_for(call_next(request), timeout=timeout) |
78 |
| - |
79 |
| - except asyncio.TimeoutError: |
80 |
| - process_time = time.time() - start_time |
81 |
| - log_dimensions = get_custom_dimensions({"request_time": process_time}, request) |
82 |
| - |
83 |
| - logger.exception( |
84 |
| - "Request timeout", |
85 |
| - extra=log_dimensions, |
86 |
| - ) |
87 |
| - |
88 |
| - ref_id = log_dimensions["custom_dimensions"].get("ref_id") |
89 |
| - debug_msg = f"Debug information for support: {ref_id}" if ref_id else "" |
90 |
| - |
91 |
| - return PlainTextResponse( |
92 |
| - f"The request exceeded the maximum allowed time, please try again." |
93 |
| - " If the issue persists, please contact [email protected]." |
94 |
| - f"\n\n{debug_msg}", |
95 |
| - status_code=HTTP_504_GATEWAY_TIMEOUT, |
96 |
| - ) |
| 38 | +def with_timeout( |
| 39 | + timeout_seconds: float, |
| 40 | +) -> Callable[[Callable[..., Any]], Callable[..., Any]]: |
| 41 | + def with_timeout_(func: Callable[..., Any]) -> Callable[..., Any]: |
| 42 | + if asyncio.iscoroutinefunction(func): |
| 43 | + logger.debug("Adding timeout to function %s", func.__name__) |
| 44 | + |
| 45 | + @wraps(func) |
| 46 | + async def inner(*args: Any, **kwargs: Any) -> Any: |
| 47 | + start_time = time.monotonic() |
| 48 | + try: |
| 49 | + return await asyncio.wait_for( |
| 50 | + func(*args, **kwargs), timeout=timeout_seconds |
| 51 | + ) |
| 52 | + except asyncio.TimeoutError as e: |
| 53 | + process_time = time.monotonic() - start_time |
| 54 | + # don't have a request object here to get custom dimensions. |
| 55 | + log_dimensions = { |
| 56 | + "request_time": process_time, |
| 57 | + } |
| 58 | + logger.exception( |
| 59 | + f"Request timeout {e}", |
| 60 | + extra=log_dimensions, |
| 61 | + ) |
| 62 | + |
| 63 | + ref_id = log_dimensions.get("ref_id") |
| 64 | + debug_msg = ( |
| 65 | + f" Debug information for support: {ref_id}" if ref_id else "" |
| 66 | + ) |
| 67 | + |
| 68 | + return PlainTextResponse( |
| 69 | + f"The request exceeded the maximum allowed time, please" |
| 70 | + " try again. If the issue persists, please contact " |
| 71 | + |
| 72 | + f"\n\n{debug_msg}", |
| 73 | + status_code=HTTP_504_GATEWAY_TIMEOUT, |
| 74 | + ) |
| 75 | + |
| 76 | + return inner |
| 77 | + else: |
| 78 | + return func |
| 79 | + |
| 80 | + return with_timeout_ |
| 81 | + |
| 82 | + |
| 83 | +def add_timeout(app: FastAPI, timeout_seconds: float) -> None: |
| 84 | + for route in app.router.routes: |
| 85 | + if isinstance(route, APIRoute): |
| 86 | + new_endpoint = with_timeout(timeout_seconds)(route.endpoint) |
| 87 | + route.endpoint = new_endpoint |
| 88 | + route.dependant = get_dependant(path=route.path_format, call=route.endpoint) |
| 89 | + for depends in route.dependencies[::-1]: |
| 90 | + route.dependant.dependencies.insert( |
| 91 | + 0, |
| 92 | + get_parameterless_sub_dependant( |
| 93 | + depends=depends, path=route.path_format |
| 94 | + ), |
| 95 | + ) |
| 96 | + route.body_field = get_body_field( |
| 97 | + dependant=route.dependant, name=route.unique_id |
| 98 | + ) |
| 99 | + route.app = request_response(route.get_route_handler()) |
0 commit comments