Skip to content

Commit

Permalink
Metrics start
Browse files Browse the repository at this point in the history
  • Loading branch information
mmastrac committed Oct 28, 2024
1 parent d34e93e commit 7e7d2d4
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 10 deletions.
63 changes: 58 additions & 5 deletions edb/server/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import os
import json as json_lib
import urllib.parse
import time
from http import HTTPStatus as HTTPStatus

from edb.server._http import Http
Expand All @@ -42,8 +43,28 @@
HeaderType = Optional[Union[list[tuple[str, str]], dict[str, str]]]


@dataclasses.dataclass(frozen=True)
class HttpStat:
response_time_ms: int
error_code: int
response_body_size: int
response_content_type: str
request_body_size: int
request_content_type: str
method: str
streaming: bool


StatCallback = Callable[[HttpStat], None]


class HttpClient:
def __init__(self, limit: int):
def __init__(
self,
limit: int,
user_agent: str = "EdgeDB",
stat_callback: Optional[StatCallback] = None,
):
self._client = Http(limit)
self._fd = self._client._fd
self._task = None
Expand All @@ -53,6 +74,8 @@ def __init__(self, limit: int):
self._streaming: dict[int, asyncio.Queue[Any]] = {}
self._next_id = 0
self._requests: dict[int, asyncio.Future] = {}
self._user_agent = user_agent
self._stat_callback = stat_callback

def __del__(self) -> None:
self.close()
Expand Down Expand Up @@ -123,7 +146,6 @@ def with_context(
headers=headers,
url_munger=url_munger,
) # type: ignore

async def request(
self,
*,
Expand All @@ -135,14 +157,27 @@ async def request(
) -> tuple[int, bytes, dict[str, str]]:
path = self._process_path(path)
headers_list = self._process_headers(headers)
headers_list.append(("User-Agent", "EdgeDB"))
headers_list.append(("User-Agent", self._user_agent))
data = self._process_content(headers_list, data, json)
id = self._next_id
self._next_id += 1
self._requests[id] = asyncio.Future()
start_time = time.time()
try:
self._client._request(id, path, method, data, headers_list)
resp = await self._requests[id]
if self._stat_callback:
status_code, body, headers = resp
self._stat_callback(HttpStat(
response_time_ms=int((time.time() - start_time) * 1000),
error_code=status_code,
response_body_size=len(body),
response_content_type=headers.get('content-type', ''),
request_body_size=len(data),
request_content_type=dict(headers_list).get('content-type', ''),
method=method,
streaming=False
))
return resp
finally:
del self._requests[id]
Expand Down Expand Up @@ -177,23 +212,40 @@ async def stream_sse(
) -> Response | ResponseSSE:
path = self._process_path(path)
headers_list = self._process_headers(headers)
headers_list.append(("User-Agent", "EdgeDB"))
headers_list.append(("User-Agent", self._user_agent))
data = self._process_content(headers_list, data, json)

id = self._next_id
self._next_id += 1
self._requests[id] = asyncio.Future()
start_time = time.time()
try:
self._client._request_sse(id, path, method, data, headers_list)
resp = await self._requests[id]
if self._stat_callback:
if id in self._streaming:
status_code, headers = resp
body = b''
else:
status_code, body, headers = resp
self._stat_callback(HttpStat(
response_time_ms=int((time.time() - start_time) * 1000),
error_code=status_code,
response_body_size=len(body),
response_content_type=headers.get('content-type', ''),
request_body_size=len(data),
request_content_type=dict(headers_list).get('content-type', ''),
method=method,
streaming=id in self._streaming
))
if id in self._streaming:
return ResponseSSE.from_tuple(resp, self._streaming[id])
return Response.from_tuple(resp)
finally:
del self._requests[id]

async def _boot(self, loop: asyncio.AbstractEventLoop) -> None:
logger.info("Python-side HTTP client booted")
logger.info(f"HTTP client initialized, user_agent={self._user_agent}")
reader = asyncio.StreamReader(loop=loop)
reader_protocol = asyncio.StreamReaderProtocol(reader)
fd = os.fdopen(self._client._fd, 'rb')
Expand Down Expand Up @@ -402,3 +454,4 @@ def __enter__(self) -> Self:

def __exit__(self, exc_type, exc_value, traceback):
pass

5 changes: 1 addition & 4 deletions edb/server/net_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,7 @@ async def _http_task(tenant: edbtenant.Tenant, http_client) -> None:


def create_http(tenant: edbtenant.Tenant):
http_max_connections = tenant._server.config_lookup(
'http_max_connections', tenant.get_sys_config()
)
return HttpClient(http_max_connections)
return tenant.get_http_client(originator="std::net")


async def http(server: edbserver.BaseServer) -> None:
Expand Down
9 changes: 8 additions & 1 deletion edb/server/tenant.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,14 @@ def set_server(self, server: edbserver.BaseServer) -> None:

def get_http_client(self, *, originator: str) -> HttpClient:
if self._http_client is None:
self._http_client = HttpClient(HTTP_MAX_CONNECTIONS)
http_max_connections = self._server.config_lookup(
'http_max_connections', self.get_sys_config()
)
self._http_client = HttpClient(
http_max_connections,
user_agent=f"EdgeDB {buildmeta.get_version_string(short=True)}",
stat_callback=lambda stat: logger.info(f"HTTP stat: {originator} {stat}"),
)
return self._http_client

def on_switch_over(self):
Expand Down

0 comments on commit 7e7d2d4

Please sign in to comment.