From 78bf15ed684a5cdfd8f8d5f7b5f3ed3885544882 Mon Sep 17 00:00:00 2001 From: Elvis Pranskevichus Date: Thu, 17 Oct 2024 14:13:05 -0700 Subject: [PATCH] Add the ability to run internal queries with reduced tx isolation (#7882) Internal queues, such as `net::http::ScheduledRequest` are structured in a way that produces lots of serialization errors if `serializable` isolation is used. To side-step this, add a way to run internal queries with reduced isolation level (via the new `tx_isolation` argument to `execute()` and `execute_json()`. As a proof of concept, switch `net_worker` to `repeatable read`. --- edb/server/defines.py | 7 +++++++ edb/server/net_worker.py | 2 ++ edb/server/pgcon/pgcon.pyx | 30 +++++++++++++++++++++--------- edb/server/protocol/execute.pyi | 2 ++ edb/server/protocol/execute.pyx | 25 +++++++++++++++++++++++++ 5 files changed, 57 insertions(+), 9 deletions(-) diff --git a/edb/server/defines.py b/edb/server/defines.py index 5122e8b9102..d305178a625 100644 --- a/edb/server/defines.py +++ b/edb/server/defines.py @@ -21,6 +21,8 @@ from typing import TypeAlias from edb import buildmeta + +from edb.common import enum as s_enum from edb.schema import defines as s_def @@ -93,3 +95,8 @@ # The time in seconds the EdgeDB server will wait for a tenant to be gracefully # shutdown when removed from a multi-tenant host. MULTITENANT_TENANT_DESTROY_TIMEOUT = 30 + + +class TxIsolationLevel(s_enum.StrEnum): + RepeatableRead = 'REPEATABLE READ' + Serializable = 'SERIALIZABLE' diff --git a/edb/server/net_worker.py b/edb/server/net_worker.py index 45ceb7ee0d6..2e186062fcb 100644 --- a/edb/server/net_worker.py +++ b/edb/server/net_worker.py @@ -80,6 +80,7 @@ async def _http_task(tenant: edbtenant.Tenant, http_client) -> None: } """, cached_globally=True, + tx_isolation=defines.TxIsolationLevel.RepeatableRead, ) pending_requests: list[dict] = json.loads(json_bytes) for pending_request in pending_requests: @@ -306,6 +307,7 @@ async def _update_request(): 'failure': json.dumps(failure), }, cached_globally=True, + tx_isolation=defines.TxIsolationLevel.RepeatableRead, ) await _update_request() diff --git a/edb/server/pgcon/pgcon.pyx b/edb/server/pgcon/pgcon.pyx index 31f9dd62664..2654fa3dc60 100644 --- a/edb/server/pgcon/pgcon.pyx +++ b/edb/server/pgcon/pgcon.pyx @@ -814,6 +814,7 @@ cdef class PGConnection: bytes state, int dbver, bint use_pending_func_cache, + tx_isolation, ): cdef: WriteBuffer out @@ -846,9 +847,10 @@ cdef class PGConnection: if state is not None: self._build_apply_state_req(state, out) if ( - query.tx_id or - not query.is_transactional or - query.append_rollback + query.tx_id + or not query.is_transactional + or query.append_rollback + or tx_isolation is not None ): # This query has START TRANSACTION or non-transactional command # like CREATE DATABASE in it. @@ -859,13 +861,18 @@ cdef class PGConnection: state_sync = 1 self.write_sync(out) - if query.append_rollback: + if query.append_rollback or tx_isolation is not None: if self.in_tx(): sp_name = f'_edb_{time.monotonic_ns()}' sql = f'SAVEPOINT {sp_name}'.encode('utf-8') else: sp_name = None sql = b'START TRANSACTION' + if tx_isolation is not None: + sql += ( + f' ISOLATION LEVEL {tx_isolation._value_}' + .encode('utf-8') + ) buf = WriteBuffer.new_message(b'P') buf.write_bytestring(b'') @@ -956,11 +963,14 @@ cdef class PGConnection: buf.write_int32(0) # limit: 0 - return all rows out.write_buffer(buf.end_message()) - if query.append_rollback: - if sp_name: - sql = f'ROLLBACK TO SAVEPOINT {sp_name}'.encode('utf-8') + if query.append_rollback or tx_isolation is not None: + if query.append_rollback: + if sp_name: + sql = f'ROLLBACK TO SAVEPOINT {sp_name}'.encode('utf-8') + else: + sql = b'ROLLBACK' else: - sql = b'ROLLBACK' + sql = b'COMMIT' buf = WriteBuffer.new_message(b'P') buf.write_bytestring(b'') @@ -990,7 +1000,7 @@ cdef class PGConnection: if state is not None: await self.wait_for_state_resp(state, state_sync) - if query.append_rollback: + if query.append_rollback or tx_isolation is not None: await self.wait_for_sync() buf = None @@ -1095,6 +1105,7 @@ cdef class PGConnection: bytes state = None, int dbver = 0, bint use_pending_func_cache = 0, + tx_isolation = None, ): self.before_command() started_at = time.monotonic() @@ -1107,6 +1118,7 @@ cdef class PGConnection: state, dbver, use_pending_func_cache, + tx_isolation, ) finally: metrics.backend_query_duration.observe( diff --git a/edb/server/protocol/execute.pyi b/edb/server/protocol/execute.pyi index 053ec6643f5..4bf7557adb8 100644 --- a/edb/server/protocol/execute.pyi +++ b/edb/server/protocol/execute.pyi @@ -25,6 +25,7 @@ import immutables from edb import errors from edb.server import compiler +from edb.server import defines as edbdef from edb.server.compiler import sertypes from edb.server.dbview import dbview @@ -48,6 +49,7 @@ async def parse_execute_json( query_cache_enabled: Optional[bool] = None, cached_globally: bool = False, use_metrics: bool = True, + tx_isolation: edbdef.TxIsolationLevel | None = None, ) -> bytes: ... diff --git a/edb/server/protocol/execute.pyx b/edb/server/protocol/execute.pyx index 08b08cefae9..2717aabea65 100644 --- a/edb/server/protocol/execute.pyx +++ b/edb/server/protocol/execute.pyx @@ -233,6 +233,7 @@ async def execute( *, fe_conn: frontend.AbstractFrontendConnection = None, use_prep_stmt: bint = False, + tx_isolation: edbdef.TxIsolationLevel | None = None, ): cdef: bytes state = None, orig_state = None @@ -297,6 +298,7 @@ async def execute( state=state, dbver=dbv.dbver, use_pending_func_cache=compiled.use_pending_func_cache, + tx_isolation=tx_isolation, ) if query_unit.needs_readback and data: @@ -700,6 +702,7 @@ async def parse_execute_json( query_cache_enabled: Optional[bool] = None, cached_globally: bool = False, use_metrics: bool = True, + tx_isolation: edbdef.TxIsolationLevel | None = None, ) -> bytes: # WARNING: only set cached_globally to True when the query is # strictly referring to only shared stable objects in user schema @@ -726,6 +729,7 @@ async def parse_execute_json( compiled, variables=variables, globals_=globals_, + tx_isolation=tx_isolation, ) finally: tenant.remove_dbview(dbv) @@ -740,6 +744,7 @@ async def execute_json( *, fe_conn: Optional[frontend.AbstractFrontendConnection] = None, use_prep_stmt: bint = False, + tx_isolation: edbdef.TxIsolationLevel | None = None, ) -> bytes: dbv.set_globals(immutables.Map({ "__::__edb_json_globals__": config.SettingValue( @@ -762,6 +767,11 @@ async def execute_json( force_script = any(x.needs_readback for x in qug) if len(qug) > 1 or force_script: + if tx_isolation is not None: + raise errors.InternalServerError( + "execute_script does not support " + "modified transaction isolation" + ) data = await execute_script( be_conn, dbv, @@ -770,12 +780,27 @@ async def execute_json( fe_conn=fe_conn, ) else: + if tx_isolation is not None: + if dbv.in_tx(): + raise errors.InternalServerError( + "cannot run statement with alternate transaction " + "isolation: already in a transaction" + ) + + query_unit = compiled.query_unit_group[0] + if not query_unit.is_transactional: + raise errors.InternalServerError( + "cannot run statement with alternate transaction " + "isolation: statement is not transactional" + ) + data = await execute( be_conn, dbv, compiled, bind_args, fe_conn=fe_conn, + tx_isolation=tx_isolation, ) if fe_conn is None: