Skip to content

Commit

Permalink
Add a auto_rebuild_query_cache_timeout config (#7518)
Browse files Browse the repository at this point in the history
Allow configuring a timeout for query cache recompilation.

Also add a test that tests this behavior and some other basic cache behavior.
  • Loading branch information
msullivan authored Jul 2, 2024
1 parent 48a7649 commit 4f4855d
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 17 deletions.
2 changes: 1 addition & 1 deletion edb/buildmeta.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
# The merge conflict there is a nice reminder that you probably need
# to write a patch in edb/pgsql/patches.py, and then you should preserve
# the old value.
EDGEDB_CATALOG_VERSION = 2024_06_26_00_00
EDGEDB_CATALOG_VERSION = 2024_07_01_00_00
EDGEDB_MAJOR_VERSION = 6


Expand Down
6 changes: 6 additions & 0 deletions edb/lib/cfg.edgeql
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,12 @@ ALTER TYPE cfg::AbstractConfig {
'Recompile all cached queries on DDL if enabled.';
};

CREATE PROPERTY auto_rebuild_query_cache_timeout -> std::duration {
CREATE ANNOTATION std::description :=
'Maximum time to spend recompiling cached queries on DDL.';
SET default := <std::duration>'60 seconds';
};

CREATE PROPERTY query_cache_mode -> cfg::QueryCacheMode {
SET default := cfg::QueryCacheMode.Default;
CREATE ANNOTATION cfg::affects_compilation := 'true';
Expand Down
44 changes: 32 additions & 12 deletions edb/server/dbview/dbview.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1120,26 +1120,44 @@ cdef class DatabaseConnectionView:
concurrency_control = asyncio.Semaphore(compile_concurrency)
rv = []

recompile_timeout = self.server.config_lookup(
"auto_rebuild_query_cache_timeout",
self.get_session_config(),
self.get_database_config(),
self.get_system_config(),
)

loop = asyncio.get_running_loop()
t0 = loop.time()
if recompile_timeout is not None:
stop_time = t0 + recompile_timeout.to_microseconds() / 1e6
else:
stop_time = None

async def recompile_request(query_req: rpc.CompilationRequest):
async with concurrency_control:
try:
if stop_time is not None and loop.time() > stop_time:
return

database_config = self.get_database_config()
system_config = self.get_compilation_system_config()
query_req = copy.copy(query_req)
query_req.set_schema_version(schema_version)
query_req.set_database_config(database_config)
query_req.set_system_config(system_config)
unit_group, _, _ = await compiler_pool.compile(
self.dbname,
user_schema,
self.get_global_schema_pickle(),
self.reflection_cache,
database_config,
system_config,
query_req.serialize(),
"<unknown>",
client_id=self.tenant.client_id,
)
async with asyncio.timeout_at(stop_time):
unit_group, _, _ = await compiler_pool.compile(
self.dbname,
user_schema,
self.get_global_schema_pickle(),
self.reflection_cache,
database_config,
system_config,
query_req.serialize(),
"<unknown>",
client_id=self.tenant.client_id,
)
except Exception:
# ignore cache entry that cannot be recompiled
pass
Expand All @@ -1148,14 +1166,16 @@ cdef class DatabaseConnectionView:

async with asyncio.TaskGroup() as g:
req: rpc.CompilationRequest
for req, grp in self._db._eql_to_compiled.items():
# Reversed so that we compile more recently used first.
for req, grp in reversed(self._db._eql_to_compiled.items()):
if (
len(grp) == 1
# Only recompile queries from the *latest* version,
# to avoid quadratic slowdown problems.
and req.schema_version == self.schema_version
):
g.create_task(recompile_request(req))

return rv

async def apply_config_ops(self, conn, ops):
Expand Down
16 changes: 12 additions & 4 deletions edb/testbase/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -697,13 +697,21 @@ def _call_system_api(
con.close()


def parse_metrics(metrics: str) -> dict[str, float]:
res = {}
for line in metrics.splitlines():
if line.startswith('#') or ' ' not in line:
continue
key, _, val = line.partition(' ')
res[key] = float(val)
return res


def _extract_background_errors(metrics: str) -> str | None:
non_zero = []

for line in metrics.splitlines():
if line.startswith('edgedb_server_background_errors_total'):
label, _, total = line.rpartition(' ')
total = float(total)
for label, total in parse_metrics(metrics).items():
if label.startswith('edgedb_server_background_errors_total'):
if total:
non_zero.append(
f'non-zero {label!r} metric: {total}'
Expand Down
78 changes: 78 additions & 0 deletions tests/test_server_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,84 @@ async def _test_connection(self, con):
transaction_state=protocol.TransactionState.NOT_IN_TRANSACTION,
)

async def test_server_ops_cache_recompile_01(self):
ckey = (
'edgedb_server_edgeql_query_compilations_total'
'{tenant="_localdev",path="compiler"}'
)
qry = 'select schema::Object { name }'

with tempfile.TemporaryDirectory() as temp_dir:
async with tb.start_edgedb_server(
data_dir=temp_dir,
default_auth_method=args.ServerAuthMethod.Trust,
) as sd:
con = await sd.connect()
try:
await con.query(qry)

# Querying a second time should hit the cache
cnt1 = tb.parse_metrics(sd.fetch_metrics()).get(ckey)
await con.query(qry)
cnt2 = tb.parse_metrics(sd.fetch_metrics()).get(ckey)
self.assertEqual(cnt1, cnt2)

await con.query('''
create type X
''')

# We should have recompiled the cache when we created
# the type, so doing the query shouldn't cause another
# compile!
cnt1 = tb.parse_metrics(sd.fetch_metrics()).get(ckey)
await con.query(qry)
cnt2 = tb.parse_metrics(sd.fetch_metrics()).get(ckey)
self.assertEqual(cnt1, cnt2)

# Set the compilation timeout to 2ms.
#
# This should prevent recompilation from
# succeeding. If we ever make the compiler fast
# enough, we might need to change this :)
#
# We do 2ms instead of 1ms or something even smaller
# because uvloop's timer has ms granularity, and
# setting it to 2ms should typically ensure that it
# manages to start the compilation.
await con.execute(
"configure current database "
"set auto_rebuild_query_cache_timeout := "
"<duration>'2ms'"
)

await con.query('''
drop type X
''')

cnt1 = tb.parse_metrics(sd.fetch_metrics()).get(ckey)
await con.query(qry)
cnt2 = tb.parse_metrics(sd.fetch_metrics()).get(ckey)
self.assertEqual(cnt1 + 1, cnt2)

finally:
await con.aclose()

# Now restart the server to test the cache persistence.
async with tb.start_edgedb_server(
data_dir=temp_dir,
default_auth_method=args.ServerAuthMethod.Trust,
) as sd:
con = await sd.connect()
try:
# It should hit the cache no problem.
cnt1 = tb.parse_metrics(sd.fetch_metrics()).get(ckey)
await con.query(qry)
cnt2 = tb.parse_metrics(sd.fetch_metrics()).get(ckey)
self.assertEqual(cnt1, cnt2)

finally:
await con.aclose()

async def test_server_ops_downgrade_to_cleartext(self):
async with tb.start_edgedb_server(
binary_endpoint_security=args.ServerEndpointSecurityMode.Optional,
Expand Down

0 comments on commit 4f4855d

Please sign in to comment.