Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split recompile and writeback in transaction #6959

Merged
merged 5 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions edb/server/compiler/rpc.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,26 @@ cdef class CompilationRequest:
):
self._serializer = compilation_config_serializer

def __copy__(self):
cdef CompilationRequest rv = CompilationRequest(self._serializer)
rv.source = self.source
rv.protocol_version = self.protocol_version
rv.output_format = self.output_format
rv.json_parameters = self.json_parameters
rv.expect_one = self.expect_one
rv.implicit_limit = self.implicit_limit
rv.inline_typeids = self.inline_typeids
rv.inline_typenames = self.inline_typenames
rv.inline_objectids = self.inline_objectids
rv.modaliases = self.modaliases
rv.session_config = self.session_config
rv.database_config = self.database_config
rv.system_config = self.system_config
rv.schema_version = self.schema_version
rv.serialized_cache = self.serialized_cache
rv.cache_key = self.cache_key
return rv

def update(
self,
source: edgeql.Source,
Expand Down
1 change: 1 addition & 0 deletions edb/server/dbview/dbview.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ cdef class CompiledQuery:
cdef public object extra_counts
cdef public object extra_blobs
cdef public object request
cdef public object recompiled_cache


cdef class DatabaseIndex:
Expand Down
119 changes: 55 additions & 64 deletions edb/server/dbview/dbview.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ from typing import (

import asyncio
import base64
import copy
import json
import os.path
import pickle
Expand Down Expand Up @@ -85,12 +86,14 @@ cdef class CompiledQuery:
extra_counts=(),
extra_blobs=(),
request=None,
recompiled_cache=None,
):
self.query_unit_group = query_unit_group
self.first_extra = first_extra
self.extra_counts = extra_counts
self.extra_blobs = extra_blobs
self.request = request
self.recompiled_cache = recompiled_cache


cdef class Database:
Expand Down Expand Up @@ -197,7 +200,7 @@ cdef class Database:
query_req, unit_group = self._eql_to_compiled.cleanup_one()
if len(unit_group) == 1:
keys.append(query_req.get_cache_key())
if keys:
if keys and debug.flags.persistent_cache:
self.tenant.create_task(
self.tenant.evict_query_cache(self.name, keys),
interruptable=True,
Expand Down Expand Up @@ -909,39 +912,24 @@ cdef class DatabaseConnectionView:
self._reset_tx_state()
return side_effects

async def clear_cache_keys(self, conn) -> list[rpc.CompilationRequest]:
rows = await conn.sql_fetch(b'SELECT "edgedb"."_clear_query_cache"()')
rv = []
for row in rows:
query_req = rpc.CompilationRequest(
self.server.compilation_config_serializer
).deserialize(row[0], "<unknown>")
rv.append(query_req)
self._db._eql_to_compiled.pop(query_req, None)
execute.signal_query_cache_changes(self)
return rv

async def recompile_all(
self, conn, requests: typing.Iterable[rpc.CompilationRequest]
):
# Assume the size of compiler pool is 100, we'll issue 50 concurrent
# compilation requests at the same time, cache up to 150 results and
# persist in one backend round-trip, in parallel.
async def recompile_cached_queries(self, user_schema, schema_version):
compiler_pool = self.server.get_compiler_pool()
compile_concurrency = max(1, compiler_pool.get_size_hint() // 2)
concurrency_control = asyncio.Semaphore(compile_concurrency)
persist_batch_size = compile_concurrency * 3
compiled_queue = asyncio.Queue(persist_batch_size)
rv = []

async def recompile_request(query_req: rpc.CompilationRequest):
async with concurrency_control:
try:
schema_version = self.schema_version
database_config = self.get_database_config()
system_config = self.get_compilation_system_config()
result = await compiler_pool.compile(
query_req = copy.copy(query_req)
query_req.set_schema_version(schema_version)
query_req.set_database_config(database_config)
query_req.set_system_config(system_config)
unit_group, _, _ = await compiler_pool.compile(
self.dbname,
self.get_user_schema_pickle(),
user_schema,
self.get_global_schema_pickle(),
self.reflection_cache,
database_config,
Expand All @@ -951,49 +939,16 @@ cdef class DatabaseConnectionView:
client_id=self.tenant.client_id,
)
except Exception:
# discard cache entry that cannot be recompiled
self._db._eql_to_compiled.pop(query_req, None)
# ignore cache entry that cannot be recompiled
pass
else:
# schema_version, database_config and system_config are not
# serialized but only affect the cache key. We only update
# these values *after* the compilation so that we can evict
# the in-memory cache by the right key when recompilation
# fails in the `except` branch above.
query_req.set_schema_version(schema_version)
query_req.set_database_config(database_config)
query_req.set_system_config(system_config)

await compiled_queue.put((query_req, result[0]))

async def persist_cache_task():
if not debug.flags.func_cache:
# TODO(fantix): sync _query_cache in one implicit tx
await conn.sql_fetch(b'SELECT "edgedb"."_clear_query_cache"()')

buf = []
running = True
while running:
while len(buf) < persist_batch_size:
item = await compiled_queue.get()
if item is None:
running = False
break
buf.append(item)
if buf:
await execute.persist_cache(
conn, self, [item[:2] for item in buf]
)
for query_req, query_unit_group in buf:
self._db._cache_compiled_query(
query_req, query_unit_group)
buf.clear()
rv.append((query_req, unit_group))

async with asyncio.TaskGroup() as g:
g.create_task(persist_cache_task())
async with asyncio.TaskGroup() as compile_group:
for req in requests:
compile_group.create_task(recompile_request(req))
await compiled_queue.put(None)
for req, grp in self._db._eql_to_compiled.items():
if len(grp) == 1:
g.create_task(recompile_request(req))
return rv

async def apply_config_ops(self, conn, ops):
settings = self.get_config_spec()
Expand Down Expand Up @@ -1124,6 +1079,41 @@ cdef class DatabaseConnectionView:
if not lock._waiters:
del lock_table[query_req]

recompiled_cache = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this logic go in a separate function? (Fine if you don't think so)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was thinking about having all this decided in server/compiler/compiler.py or deeper where we can have per-statement "dbview" of session_config, database_config and all (it's like what dbview.pyx does in start()/on_success(), but it'll be too late for recompilation), so that we can handle scripts like:

configure session set auto_rebuild_query_cache := false;
create type Foo;
configure session set auto_rebuild_query_cache := true;

or use the correct config to recompile after scripts like:

configure current database set apply_access_policies := true;
create Type Foo;

So basically this logic is temporarily here and I'm expecting it to be replaced by a simple if query_unit_group.should_recompile or something like that in the near future.

if (
not self.in_tx()
or len(query_unit_group) > 0
and query_unit_group[0].tx_commit
):
# Recompile all cached queries if:
# * Issued a DDL or committing a tx with DDL (recompilation
# before in-tx DDL needs to fix _in_tx_with_ddl caching 1st)
# * Config.auto_rebuild_query_cache is turned on
#
# Ideally we should compute the proper user_schema, database_config
# and system_config for recompilation from server/compiler.py with
# proper handling of config values. For now we just use the values
# in the current dbview and not support certain marginal cases.
user_schema = None
user_schema_version = None
for unit in query_unit_group:
if unit.tx_rollback:
break
if unit.user_schema:
user_schema = unit.user_schema
user_schema_version = unit.user_schema_version
if user_schema and not self.server.config_lookup(
"auto_rebuild_query_cache",
self.get_session_config(),
self.get_database_config(),
self.get_system_config(),
):
user_schema = None
if user_schema:
recompiled_cache = await self.recompile_cached_queries(
user_schema, user_schema_version
)

if use_metrics:
metrics.edgeql_query_compilations.inc(
1.0, self.tenant.get_instance_name(), 'compiler'
Expand All @@ -1136,6 +1126,7 @@ cdef class DatabaseConnectionView:
extra_counts=source.extra_counts(),
extra_blobs=source.extra_blobs(),
request=query_req,
recompiled_cache=recompiled_cache,
)

cdef inline _check_in_tx_error(self, query_unit_group):
Expand Down
3 changes: 2 additions & 1 deletion edb/server/protocol/binary.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -838,7 +838,8 @@ cdef class EdgeConnection(frontend.FrontendConnection):
if len(units) == 1 and units[0].cache_sql:
conn = await self.get_pgcon()
try:
await execute.persist_cache(conn, _dbview, [(query_req, units)])
g = execute.build_cache_persistence_units([(query_req, units)])
await g.execute(conn, _dbview)
finally:
self.maybe_release_pgcon(conn)

Expand Down
Loading