Skip to content

Commit

Permalink
Add the ability to run internal queries with reduced tx isolation (#7882
Browse files Browse the repository at this point in the history
)

Internal queues, such as `net::http::ScheduledRequest` are structured
in a way that produces lots of serialization errors if `serializable`
isolation is used.  To side-step this, add a way to run internal queries
with reduced isolation level (via the new `tx_isolation` argument to
`execute()` and `execute_json()`.

As a proof of concept, switch `net_worker` to `repeatable read`.
  • Loading branch information
elprans authored Oct 17, 2024
1 parent 8319bbe commit 78bf15e
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 9 deletions.
7 changes: 7 additions & 0 deletions edb/server/defines.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
from typing import TypeAlias

from edb import buildmeta

from edb.common import enum as s_enum
from edb.schema import defines as s_def


Expand Down Expand Up @@ -93,3 +95,8 @@
# The time in seconds the EdgeDB server will wait for a tenant to be gracefully
# shutdown when removed from a multi-tenant host.
MULTITENANT_TENANT_DESTROY_TIMEOUT = 30


class TxIsolationLevel(s_enum.StrEnum):
RepeatableRead = 'REPEATABLE READ'
Serializable = 'SERIALIZABLE'
2 changes: 2 additions & 0 deletions edb/server/net_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ async def _http_task(tenant: edbtenant.Tenant, http_client) -> None:
}
""",
cached_globally=True,
tx_isolation=defines.TxIsolationLevel.RepeatableRead,
)
pending_requests: list[dict] = json.loads(json_bytes)
for pending_request in pending_requests:
Expand Down Expand Up @@ -306,6 +307,7 @@ async def _update_request():
'failure': json.dumps(failure),
},
cached_globally=True,
tx_isolation=defines.TxIsolationLevel.RepeatableRead,
)

await _update_request()
30 changes: 21 additions & 9 deletions edb/server/pgcon/pgcon.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,7 @@ cdef class PGConnection:
bytes state,
int dbver,
bint use_pending_func_cache,
tx_isolation,
):
cdef:
WriteBuffer out
Expand Down Expand Up @@ -846,9 +847,10 @@ cdef class PGConnection:
if state is not None:
self._build_apply_state_req(state, out)
if (
query.tx_id or
not query.is_transactional or
query.append_rollback
query.tx_id
or not query.is_transactional
or query.append_rollback
or tx_isolation is not None
):
# This query has START TRANSACTION or non-transactional command
# like CREATE DATABASE in it.
Expand All @@ -859,13 +861,18 @@ cdef class PGConnection:
state_sync = 1
self.write_sync(out)

if query.append_rollback:
if query.append_rollback or tx_isolation is not None:
if self.in_tx():
sp_name = f'_edb_{time.monotonic_ns()}'
sql = f'SAVEPOINT {sp_name}'.encode('utf-8')
else:
sp_name = None
sql = b'START TRANSACTION'
if tx_isolation is not None:
sql += (
f' ISOLATION LEVEL {tx_isolation._value_}'
.encode('utf-8')
)

buf = WriteBuffer.new_message(b'P')
buf.write_bytestring(b'')
Expand Down Expand Up @@ -956,11 +963,14 @@ cdef class PGConnection:
buf.write_int32(0) # limit: 0 - return all rows
out.write_buffer(buf.end_message())

if query.append_rollback:
if sp_name:
sql = f'ROLLBACK TO SAVEPOINT {sp_name}'.encode('utf-8')
if query.append_rollback or tx_isolation is not None:
if query.append_rollback:
if sp_name:
sql = f'ROLLBACK TO SAVEPOINT {sp_name}'.encode('utf-8')
else:
sql = b'ROLLBACK'
else:
sql = b'ROLLBACK'
sql = b'COMMIT'

buf = WriteBuffer.new_message(b'P')
buf.write_bytestring(b'')
Expand Down Expand Up @@ -990,7 +1000,7 @@ cdef class PGConnection:
if state is not None:
await self.wait_for_state_resp(state, state_sync)

if query.append_rollback:
if query.append_rollback or tx_isolation is not None:
await self.wait_for_sync()

buf = None
Expand Down Expand Up @@ -1095,6 +1105,7 @@ cdef class PGConnection:
bytes state = None,
int dbver = 0,
bint use_pending_func_cache = 0,
tx_isolation = None,
):
self.before_command()
started_at = time.monotonic()
Expand All @@ -1107,6 +1118,7 @@ cdef class PGConnection:
state,
dbver,
use_pending_func_cache,
tx_isolation,
)
finally:
metrics.backend_query_duration.observe(
Expand Down
2 changes: 2 additions & 0 deletions edb/server/protocol/execute.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import immutables

from edb import errors
from edb.server import compiler
from edb.server import defines as edbdef
from edb.server.compiler import sertypes
from edb.server.dbview import dbview

Expand All @@ -48,6 +49,7 @@ async def parse_execute_json(
query_cache_enabled: Optional[bool] = None,
cached_globally: bool = False,
use_metrics: bool = True,
tx_isolation: edbdef.TxIsolationLevel | None = None,
) -> bytes:
...

Expand Down
25 changes: 25 additions & 0 deletions edb/server/protocol/execute.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ async def execute(
*,
fe_conn: frontend.AbstractFrontendConnection = None,
use_prep_stmt: bint = False,
tx_isolation: edbdef.TxIsolationLevel | None = None,
):
cdef:
bytes state = None, orig_state = None
Expand Down Expand Up @@ -297,6 +298,7 @@ async def execute(
state=state,
dbver=dbv.dbver,
use_pending_func_cache=compiled.use_pending_func_cache,
tx_isolation=tx_isolation,
)

if query_unit.needs_readback and data:
Expand Down Expand Up @@ -700,6 +702,7 @@ async def parse_execute_json(
query_cache_enabled: Optional[bool] = None,
cached_globally: bool = False,
use_metrics: bool = True,
tx_isolation: edbdef.TxIsolationLevel | None = None,
) -> bytes:
# WARNING: only set cached_globally to True when the query is
# strictly referring to only shared stable objects in user schema
Expand All @@ -726,6 +729,7 @@ async def parse_execute_json(
compiled,
variables=variables,
globals_=globals_,
tx_isolation=tx_isolation,
)
finally:
tenant.remove_dbview(dbv)
Expand All @@ -740,6 +744,7 @@ async def execute_json(
*,
fe_conn: Optional[frontend.AbstractFrontendConnection] = None,
use_prep_stmt: bint = False,
tx_isolation: edbdef.TxIsolationLevel | None = None,
) -> bytes:
dbv.set_globals(immutables.Map({
"__::__edb_json_globals__": config.SettingValue(
Expand All @@ -762,6 +767,11 @@ async def execute_json(

force_script = any(x.needs_readback for x in qug)
if len(qug) > 1 or force_script:
if tx_isolation is not None:
raise errors.InternalServerError(
"execute_script does not support "
"modified transaction isolation"
)
data = await execute_script(
be_conn,
dbv,
Expand All @@ -770,12 +780,27 @@ async def execute_json(
fe_conn=fe_conn,
)
else:
if tx_isolation is not None:
if dbv.in_tx():
raise errors.InternalServerError(
"cannot run statement with alternate transaction "
"isolation: already in a transaction"
)

query_unit = compiled.query_unit_group[0]
if not query_unit.is_transactional:
raise errors.InternalServerError(
"cannot run statement with alternate transaction "
"isolation: statement is not transactional"
)

data = await execute(
be_conn,
dbv,
compiled,
bind_args,
fe_conn=fe_conn,
tx_isolation=tx_isolation,
)

if fe_conn is None:
Expand Down

0 comments on commit 78bf15e

Please sign in to comment.