diff --git a/edb/server/http.py b/edb/server/http.py index a4eaeaf4f2da..2c6fe86060c1 100644 --- a/edb/server/http.py +++ b/edb/server/http.py @@ -34,6 +34,7 @@ import os import json as json_lib import urllib.parse +import time from http import HTTPStatus as HTTPStatus from edb.server._http import Http @@ -42,8 +43,28 @@ HeaderType = Optional[Union[list[tuple[str, str]], dict[str, str]]] +@dataclasses.dataclass(frozen=True) +class HttpStat: + response_time_ms: int + error_code: int + response_body_size: int + response_content_type: str + request_body_size: int + request_content_type: str + method: str + streaming: bool + + +StatCallback = Callable[[HttpStat], None] + + class HttpClient: - def __init__(self, limit: int): + def __init__( + self, + limit: int, + user_agent: str = "EdgeDB", + stat_callback: Optional[StatCallback] = None, + ): self._client = Http(limit) self._fd = self._client._fd self._task = None @@ -53,6 +74,8 @@ def __init__(self, limit: int): self._streaming: dict[int, asyncio.Queue[Any]] = {} self._next_id = 0 self._requests: dict[int, asyncio.Future] = {} + self._user_agent = user_agent + self._stat_callback = stat_callback def __del__(self) -> None: self.close() @@ -123,7 +146,6 @@ def with_context( headers=headers, url_munger=url_munger, ) # type: ignore - async def request( self, *, @@ -135,14 +157,27 @@ async def request( ) -> tuple[int, bytes, dict[str, str]]: path = self._process_path(path) headers_list = self._process_headers(headers) - headers_list.append(("User-Agent", "EdgeDB")) + headers_list.append(("User-Agent", self._user_agent)) data = self._process_content(headers_list, data, json) id = self._next_id self._next_id += 1 self._requests[id] = asyncio.Future() + start_time = time.time() try: self._client._request(id, path, method, data, headers_list) resp = await self._requests[id] + if self._stat_callback: + status_code, body, headers = resp + self._stat_callback(HttpStat( + response_time_ms=int((time.time() - start_time) * 1000), + error_code=status_code, + response_body_size=len(body), + response_content_type=headers.get('content-type', ''), + request_body_size=len(data), + request_content_type=dict(headers_list).get('content-type', ''), + method=method, + streaming=False + )) return resp finally: del self._requests[id] @@ -177,15 +212,32 @@ async def stream_sse( ) -> Response | ResponseSSE: path = self._process_path(path) headers_list = self._process_headers(headers) - headers_list.append(("User-Agent", "EdgeDB")) + headers_list.append(("User-Agent", self._user_agent)) data = self._process_content(headers_list, data, json) id = self._next_id self._next_id += 1 self._requests[id] = asyncio.Future() + start_time = time.time() try: self._client._request_sse(id, path, method, data, headers_list) resp = await self._requests[id] + if self._stat_callback: + if id in self._streaming: + status_code, headers = resp + body = b'' + else: + status_code, body, headers = resp + self._stat_callback(HttpStat( + response_time_ms=int((time.time() - start_time) * 1000), + error_code=status_code, + response_body_size=len(body), + response_content_type=headers.get('content-type', ''), + request_body_size=len(data), + request_content_type=dict(headers_list).get('content-type', ''), + method=method, + streaming=id in self._streaming + )) if id in self._streaming: return ResponseSSE.from_tuple(resp, self._streaming[id]) return Response.from_tuple(resp) @@ -193,7 +245,7 @@ async def stream_sse( del self._requests[id] async def _boot(self, loop: asyncio.AbstractEventLoop) -> None: - logger.info("Python-side HTTP client booted") + logger.info(f"HTTP client initialized, user_agent={self._user_agent}") reader = asyncio.StreamReader(loop=loop) reader_protocol = asyncio.StreamReaderProtocol(reader) fd = os.fdopen(self._client._fd, 'rb') @@ -402,3 +454,4 @@ def __enter__(self) -> Self: def __exit__(self, exc_type, exc_value, traceback): pass + diff --git a/edb/server/net_worker.py b/edb/server/net_worker.py index e6e196d7e153..784eea387248 100644 --- a/edb/server/net_worker.py +++ b/edb/server/net_worker.py @@ -99,10 +99,7 @@ async def _http_task(tenant: edbtenant.Tenant, http_client) -> None: def create_http(tenant: edbtenant.Tenant): - http_max_connections = tenant._server.config_lookup( - 'http_max_connections', tenant.get_sys_config() - ) - return HttpClient(http_max_connections) + return tenant.get_http_client(originator="std::net") async def http(server: edbserver.BaseServer) -> None: diff --git a/edb/server/tenant.py b/edb/server/tenant.py index a4d11725c4f7..a37f5c65d1e4 100644 --- a/edb/server/tenant.py +++ b/edb/server/tenant.py @@ -248,7 +248,14 @@ def set_server(self, server: edbserver.BaseServer) -> None: def get_http_client(self, *, originator: str) -> HttpClient: if self._http_client is None: - self._http_client = HttpClient(HTTP_MAX_CONNECTIONS) + http_max_connections = self._server.config_lookup( + 'http_max_connections', self.get_sys_config() + ) + self._http_client = HttpClient( + http_max_connections, + user_agent=f"EdgeDB {buildmeta.get_version_string(short=True)}", + stat_callback=lambda stat: logger.info(f"HTTP stat: {originator} {stat}"), + ) return self._http_client def on_switch_over(self):