Skip to content

Commit

Permalink
Split recompile and writeback in transaction (#6959)
Browse files Browse the repository at this point in the history
  • Loading branch information
fantix authored and msullivan committed Mar 8, 2024
1 parent 7482017 commit 818a3e5
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 144 deletions.
20 changes: 20 additions & 0 deletions edb/server/compiler/rpc.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,26 @@ cdef class CompilationRequest:
):
self._serializer = compilation_config_serializer

def __copy__(self):
cdef CompilationRequest rv = CompilationRequest(self._serializer)
rv.source = self.source
rv.protocol_version = self.protocol_version
rv.output_format = self.output_format
rv.json_parameters = self.json_parameters
rv.expect_one = self.expect_one
rv.implicit_limit = self.implicit_limit
rv.inline_typeids = self.inline_typeids
rv.inline_typenames = self.inline_typenames
rv.inline_objectids = self.inline_objectids
rv.modaliases = self.modaliases
rv.session_config = self.session_config
rv.database_config = self.database_config
rv.system_config = self.system_config
rv.schema_version = self.schema_version
rv.serialized_cache = self.serialized_cache
rv.cache_key = self.cache_key
return rv

def update(
self,
source: edgeql.Source,
Expand Down
1 change: 1 addition & 0 deletions edb/server/dbview/dbview.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ cdef class CompiledQuery:
cdef public object extra_counts
cdef public object extra_blobs
cdef public object request
cdef public object recompiled_cache


cdef class DatabaseIndex:
Expand Down
119 changes: 55 additions & 64 deletions edb/server/dbview/dbview.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ from typing import (

import asyncio
import base64
import copy
import json
import os.path
import pickle
Expand Down Expand Up @@ -85,12 +86,14 @@ cdef class CompiledQuery:
extra_counts=(),
extra_blobs=(),
request=None,
recompiled_cache=None,
):
self.query_unit_group = query_unit_group
self.first_extra = first_extra
self.extra_counts = extra_counts
self.extra_blobs = extra_blobs
self.request = request
self.recompiled_cache = recompiled_cache


cdef class Database:
Expand Down Expand Up @@ -197,7 +200,7 @@ cdef class Database:
query_req, unit_group = self._eql_to_compiled.cleanup_one()
if len(unit_group) == 1:
keys.append(query_req.get_cache_key())
if keys:
if keys and debug.flags.persistent_cache:
self.tenant.create_task(
self.tenant.evict_query_cache(self.name, keys),
interruptable=True,
Expand Down Expand Up @@ -909,39 +912,24 @@ cdef class DatabaseConnectionView:
self._reset_tx_state()
return side_effects

async def clear_cache_keys(self, conn) -> list[rpc.CompilationRequest]:
rows = await conn.sql_fetch(b'SELECT "edgedb"."_clear_query_cache"()')
rv = []
for row in rows:
query_req = rpc.CompilationRequest(
self.server.compilation_config_serializer
).deserialize(row[0], "<unknown>")
rv.append(query_req)
self._db._eql_to_compiled.pop(query_req, None)
execute.signal_query_cache_changes(self)
return rv

async def recompile_all(
self, conn, requests: typing.Iterable[rpc.CompilationRequest]
):
# Assume the size of compiler pool is 100, we'll issue 50 concurrent
# compilation requests at the same time, cache up to 150 results and
# persist in one backend round-trip, in parallel.
async def recompile_cached_queries(self, user_schema, schema_version):
compiler_pool = self.server.get_compiler_pool()
compile_concurrency = max(1, compiler_pool.get_size_hint() // 2)
concurrency_control = asyncio.Semaphore(compile_concurrency)
persist_batch_size = compile_concurrency * 3
compiled_queue = asyncio.Queue(persist_batch_size)
rv = []

async def recompile_request(query_req: rpc.CompilationRequest):
async with concurrency_control:
try:
schema_version = self.schema_version
database_config = self.get_database_config()
system_config = self.get_compilation_system_config()
result = await compiler_pool.compile(
query_req = copy.copy(query_req)
query_req.set_schema_version(schema_version)
query_req.set_database_config(database_config)
query_req.set_system_config(system_config)
unit_group, _, _ = await compiler_pool.compile(
self.dbname,
self.get_user_schema_pickle(),
user_schema,
self.get_global_schema_pickle(),
self.reflection_cache,
database_config,
Expand All @@ -951,49 +939,16 @@ cdef class DatabaseConnectionView:
client_id=self.tenant.client_id,
)
except Exception:
# discard cache entry that cannot be recompiled
self._db._eql_to_compiled.pop(query_req, None)
# ignore cache entry that cannot be recompiled
pass
else:
# schema_version, database_config and system_config are not
# serialized but only affect the cache key. We only update
# these values *after* the compilation so that we can evict
# the in-memory cache by the right key when recompilation
# fails in the `except` branch above.
query_req.set_schema_version(schema_version)
query_req.set_database_config(database_config)
query_req.set_system_config(system_config)

await compiled_queue.put((query_req, result[0]))

async def persist_cache_task():
if not debug.flags.func_cache:
# TODO(fantix): sync _query_cache in one implicit tx
await conn.sql_fetch(b'SELECT "edgedb"."_clear_query_cache"()')

buf = []
running = True
while running:
while len(buf) < persist_batch_size:
item = await compiled_queue.get()
if item is None:
running = False
break
buf.append(item)
if buf:
await execute.persist_cache(
conn, self, [item[:2] for item in buf]
)
for query_req, query_unit_group in buf:
self._db._cache_compiled_query(
query_req, query_unit_group)
buf.clear()
rv.append((query_req, unit_group))

async with asyncio.TaskGroup() as g:
g.create_task(persist_cache_task())
async with asyncio.TaskGroup() as compile_group:
for req in requests:
compile_group.create_task(recompile_request(req))
await compiled_queue.put(None)
for req, grp in self._db._eql_to_compiled.items():
if len(grp) == 1:
g.create_task(recompile_request(req))
return rv

async def apply_config_ops(self, conn, ops):
settings = self.get_config_spec()
Expand Down Expand Up @@ -1124,6 +1079,41 @@ cdef class DatabaseConnectionView:
if not lock._waiters:
del lock_table[query_req]

recompiled_cache = None
if (
not self.in_tx()
or len(query_unit_group) > 0
and query_unit_group[0].tx_commit
):
# Recompile all cached queries if:
# * Issued a DDL or committing a tx with DDL (recompilation
# before in-tx DDL needs to fix _in_tx_with_ddl caching 1st)
# * Config.auto_rebuild_query_cache is turned on
#
# Ideally we should compute the proper user_schema, database_config
# and system_config for recompilation from server/compiler.py with
# proper handling of config values. For now we just use the values
# in the current dbview and not support certain marginal cases.
user_schema = None
user_schema_version = None
for unit in query_unit_group:
if unit.tx_rollback:
break
if unit.user_schema:
user_schema = unit.user_schema
user_schema_version = unit.user_schema_version
if user_schema and not self.server.config_lookup(
"auto_rebuild_query_cache",
self.get_session_config(),
self.get_database_config(),
self.get_system_config(),
):
user_schema = None
if user_schema:
recompiled_cache = await self.recompile_cached_queries(
user_schema, user_schema_version
)

if use_metrics:
metrics.edgeql_query_compilations.inc(
1.0, self.tenant.get_instance_name(), 'compiler'
Expand All @@ -1136,6 +1126,7 @@ cdef class DatabaseConnectionView:
extra_counts=source.extra_counts(),
extra_blobs=source.extra_blobs(),
request=query_req,
recompiled_cache=recompiled_cache,
)

cdef inline _check_in_tx_error(self, query_unit_group):
Expand Down
3 changes: 2 additions & 1 deletion edb/server/protocol/binary.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -838,7 +838,8 @@ cdef class EdgeConnection(frontend.FrontendConnection):
if len(units) == 1 and units[0].cache_sql:
conn = await self.get_pgcon()
try:
await execute.persist_cache(conn, _dbview, [(query_req, units)])
g = execute.build_cache_persistence_units([(query_req, units)])
await g.execute(conn, _dbview)
finally:
self.maybe_release_pgcon(conn)

Expand Down
Loading

0 comments on commit 818a3e5

Please sign in to comment.