From 14564d3725ee0066d7e427360b519b76cdc48304 Mon Sep 17 00:00:00 2001 From: "Michael J. Sullivan" Date: Wed, 23 Oct 2024 14:52:46 -0700 Subject: [PATCH] Some more changes to reduce adjacent server flakes 1. Make worker threads (like net) not attempt new connections once the server has started shutting down. 2. Don't respond to messages that prompt updating database metadata for database's we don't have introspected anyway. 3. Mark a lot of worker tasks as being interruptable. When they weren't interruptable, server exit had to wait for them to exit, which they didn't always do. This caused connections to get held open. *Most* of the locally observable flakes I've been seeing were fixed by these changes, though I am still getting test_server_adjacent_database_propagation failures where it fails to connect to the newly create database. Still working on that. --- edb/server/dbview/dbview.pyi | 3 +++ edb/server/dbview/dbview.pyx | 3 +++ edb/server/net_worker.py | 4 ++-- edb/server/protocol/auth_ext/email.py | 2 +- edb/server/protocol/auth_ext/pkce.py | 2 +- edb/server/tenant.py | 19 ++++++++++++++++--- 6 files changed, 26 insertions(+), 7 deletions(-) diff --git a/edb/server/dbview/dbview.pyi b/edb/server/dbview/dbview.pyi index caac4e4171b..f26fd0d7186 100644 --- a/edb/server/dbview/dbview.pyi +++ b/edb/server/dbview/dbview.pyi @@ -118,6 +118,9 @@ class Database: def lookup_config(self, name: str) -> Any: ... + def is_introspected(self) -> bool: + ... + class DatabaseConnectionView: def in_tx(self) -> bool: ... diff --git a/edb/server/dbview/dbview.pyx b/edb/server/dbview/dbview.pyx index ed376cf16ba..af3336de0e2 100644 --- a/edb/server/dbview/dbview.pyx +++ b/edb/server/dbview/dbview.pyx @@ -453,6 +453,9 @@ cdef class Database: if self.user_schema_pickle is None: await self.tenant.introspect_db(self.name) + def is_introspected(self): + return self.user_schema_pickle is not None + def lookup_config(self, name: str): spec = self._index._sys_config_spec if self.user_config_spec is not None: diff --git a/edb/server/net_worker.py b/edb/server/net_worker.py index 8c82fa75e84..18fd33c08ed 100644 --- a/edb/server/net_worker.py +++ b/edb/server/net_worker.py @@ -120,7 +120,7 @@ async def http(server: edbserver.BaseServer) -> None: tasks.append( tenant.create_task( _http_task(tenant, tenant_http[tenant]), - interruptable=False, + interruptable=True, ) ) # Remove unused tenant_http entries @@ -310,7 +310,7 @@ async def gc(server: edbserver.BaseServer) -> None: while True: tasks = [ tenant.create_task( - _gc(tenant, NET_HTTP_REQUEST_TTL), interruptable=False + _gc(tenant, NET_HTTP_REQUEST_TTL), interruptable=True ) for tenant in server.iter_tenants() if tenant.accept_new_tasks diff --git a/edb/server/protocol/auth_ext/email.py b/edb/server/protocol/auth_ext/email.py index bffe9f1f5c4..826ae5bb111 100644 --- a/edb/server/protocol/auth_ext/email.py +++ b/edb/server/protocol/auth_ext/email.py @@ -131,7 +131,7 @@ async def noop_coroutine() -> None: async def _protected_send( coro: Coroutine[Any, Any, None], tenant: tenant.Tenant ) -> None: - task = tenant.create_task(coro, interruptable=False) + task = tenant.create_task(coro, interruptable=True) # Prevent timing attack await asyncio.sleep(random.random() * 0.5) # Expose e.g. configuration errors diff --git a/edb/server/protocol/auth_ext/pkce.py b/edb/server/protocol/auth_ext/pkce.py index a8acfdc5752..b4d1c5ddde8 100644 --- a/edb/server/protocol/auth_ext/pkce.py +++ b/edb/server/protocol/auth_ext/pkce.py @@ -193,7 +193,7 @@ async def gc(server: edbserver.BaseServer) -> None: while True: try: tasks = [ - tenant.create_task(_gc(tenant), interruptable=False) + tenant.create_task(_gc(tenant), interruptable=True) for tenant in server.iter_tenants() if tenant.accept_new_tasks ] diff --git a/edb/server/tenant.py b/edb/server/tenant.py index d0ce0c0ecb1..948e39fbc62 100644 --- a/edb/server/tenant.py +++ b/edb/server/tenant.py @@ -591,6 +591,18 @@ def stop_accepting_connections(self) -> None: def accept_new_tasks(self): return self._accept_new_tasks + def is_db_ready(self, dbname: str) -> bool: + if not self._accept_new_tasks: + return False + + if ( + not (db := self.maybe_get_db(dbname=dbname)) + or not db.is_introspected() + ): + return False + + return True + def create_task( self, coro: Coroutine, @@ -962,7 +974,8 @@ def allow_database_connections(self, dbname: str) -> None: def is_database_connectable(self, dbname: str) -> bool: return ( - dbname != defines.EDGEDB_TEMPLATE_DB + self._running + and dbname != defines.EDGEDB_TEMPLATE_DB and dbname not in self._block_new_connections ) @@ -1662,7 +1675,7 @@ async def task(): self.create_task(task(), interruptable=True) def on_remote_ddl(self, dbname: str) -> None: - if not self._accept_new_tasks: + if not self.is_db_ready(dbname): return # Triggered by a postgres notification event 'schema-changes' @@ -1826,7 +1839,7 @@ def on_remote_query_cache_change( dbname: str, keys: Optional[list[str]], ) -> None: - if not self._accept_new_tasks: + if not self.is_db_ready(dbname): return async def task():