From 780d9e7c4a34bfb47acb539641619146adf378de Mon Sep 17 00:00:00 2001 From: Fantix King Date: Fri, 31 May 2024 12:07:00 -0400 Subject: [PATCH 1/9] Cache cardinality instead of has_na_cardinality --- edgedb/protocol/protocol.pxd | 2 +- edgedb/protocol/protocol.pyx | 11 ++++++----- edgedb/protocol/protocol_v0.pyx | 8 ++++---- 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/edgedb/protocol/protocol.pxd b/edgedb/protocol/protocol.pxd index 03f04964..6dccf624 100644 --- a/edgedb/protocol/protocol.pxd +++ b/edgedb/protocol/protocol.pxd @@ -74,7 +74,7 @@ cdef class QueryCodecsCache: cdef set(self, str query, OutputFormat output_format, int implicit_limit, bint inline_typenames, bint inline_typeids, - bint expect_one, bint has_na_cardinality, + bint expect_one, bytes cardinality, BaseCodec in_type, BaseCodec out_type, int capabilities) diff --git a/edgedb/protocol/protocol.pyx b/edgedb/protocol/protocol.pyx index 469d3cb4..505f92ce 100644 --- a/edgedb/protocol/protocol.pyx +++ b/edgedb/protocol/protocol.pyx @@ -114,7 +114,7 @@ cdef class QueryCodecsCache: cdef set( self, str query, OutputFormat output_format, int implicit_limit, bint inline_typenames, bint inline_typeids, - bint expect_one, bint has_na_cardinality, + bint expect_one, bytes cardinality, BaseCodec in_type, BaseCodec out_type, int capabilities, ): key = ( @@ -128,7 +128,7 @@ cdef class QueryCodecsCache: assert in_type is not None assert out_type is not None self.queries[key] = ( - has_na_cardinality, in_type, out_type, capabilities + cardinality, in_type, out_type, capabilities ) @@ -405,7 +405,7 @@ cdef class SansIOProtocol: inline_typenames, inline_typeids, expect_one, - new_cardinality == CARDINALITY_NOT_APPLICABLE, + new_cardinality, in_dc, out_dc, capabilities) elif mtype == STATE_DATA_DESC_MSG: @@ -501,6 +501,7 @@ cdef class SansIOProtocol: cdef: BaseCodec in_dc BaseCodec out_dc + bytes cardinality self.ensure_connected() self.reset_status() @@ -538,7 +539,7 @@ cdef class SansIOProtocol: state=state, ) - has_na_cardinality = parsed[0] == CARDINALITY_NOT_APPLICABLE + cardinality = parsed[0] in_dc = parsed[1] out_dc = parsed[2] capabilities = parsed[3] @@ -550,7 +551,7 @@ cdef class SansIOProtocol: inline_typenames, inline_typeids, expect_one, - has_na_cardinality, + cardinality, in_dc, out_dc, capabilities, diff --git a/edgedb/protocol/protocol_v0.pyx b/edgedb/protocol/protocol_v0.pyx index 399f803c..574adda6 100644 --- a/edgedb/protocol/protocol_v0.pyx +++ b/edgedb/protocol/protocol_v0.pyx @@ -294,7 +294,7 @@ cdef class SansIOProtocolBackwardsCompatible(SansIOProtocol): inline_typenames, inline_typeids, expect_one, - new_cardinality == CARDINALITY_NOT_APPLICABLE, + new_cardinality, in_dc, out_dc, capabilities) re_exec = True @@ -412,7 +412,7 @@ cdef class SansIOProtocolBackwardsCompatible(SansIOProtocol): inline_typenames, inline_typeids, expect_one, - cardinality == CARDINALITY_NOT_APPLICABLE, + cardinality, in_dc, out_dc, capabilities, @@ -421,11 +421,11 @@ cdef class SansIOProtocolBackwardsCompatible(SansIOProtocol): ret = await self._legacy_execute(in_dc, out_dc, args, kwargs) else: - has_na_cardinality = codecs[0] + cardinality = codecs[0] in_dc = codecs[1] out_dc = codecs[2] - if required_one and has_na_cardinality: + if required_one and cardinality == CARDINALITY_NOT_APPLICABLE: methname = _QUERY_SINGLE_METHOD[required_one][output_format] raise errors.InterfaceError( f'query cannot be executed with {methname}() as it ' From bf8f3281f6da68b243ce63ad89fef3e691926959 Mon Sep 17 00:00:00 2001 From: Fantix King Date: Wed, 29 May 2024 16:19:51 -0400 Subject: [PATCH 2/9] Extract ExecuteContext as in/out argument So that we can pass out the parsed capabilities to control retries. This also allows further code optimization. --- edgedb/_testbase.py | 6 ++ edgedb/abstract.py | 30 ++++++ edgedb/base_client.py | 63 ++--------- edgedb/protocol/protocol.pxd | 24 +++++ edgedb/protocol/protocol.pyx | 181 ++++++++++++++++---------------- edgedb/protocol/protocol_v0.pyx | 94 ++++++++--------- tests/test_sync_retry.py | 75 +++++++++++++ tests/test_sync_tx.py | 2 +- 8 files changed, 280 insertions(+), 195 deletions(-) diff --git a/edgedb/_testbase.py b/edgedb/_testbase.py index 5680036c..e5b59508 100644 --- a/edgedb/_testbase.py +++ b/edgedb/_testbase.py @@ -372,10 +372,16 @@ def make_test_client( database='edgedb', user='edgedb', password='test', + host=..., + port=..., connection_class=..., ): conargs = cls.get_connect_args( cluster=cluster, database=database, user=user, password=password) + if host is not ...: + conargs['host'] = host + if port is not ...: + conargs['port'] = port if connection_class is ...: connection_class = ( asyncio_client.AsyncIOConnection diff --git a/edgedb/abstract.py b/edgedb/abstract.py index 7e8f4f6a..a1c6cfa4 100644 --- a/edgedb/abstract.py +++ b/edgedb/abstract.py @@ -65,12 +65,42 @@ class QueryContext(typing.NamedTuple): retry_options: typing.Optional[options.RetryOptions] state: typing.Optional[options.State] + def lower( + self, *, allow_capabilities: enums.Capability + ) -> protocol.ExecuteContext: + return protocol.ExecuteContext( + query=self.query.query, + args=self.query.args, + kwargs=self.query.kwargs, + reg=self.cache.codecs_registry, + qc=self.cache.query_cache, + output_format=self.query_options.output_format, + expect_one=self.query_options.expect_one, + required_one=self.query_options.required_one, + allow_capabilities=allow_capabilities, + state=self.state.as_dict() if self.state else None, + ) + class ExecuteContext(typing.NamedTuple): query: QueryWithArgs cache: QueryCache state: typing.Optional[options.State] + def lower( + self, *, allow_capabilities: enums.Capability + ) -> protocol.ExecuteContext: + return protocol.ExecuteContext( + query=self.query.query, + args=self.query.args, + kwargs=self.query.kwargs, + reg=self.cache.codecs_registry, + qc=self.cache.query_cache, + output_format=protocol.OutputFormat.NONE, + allow_capabilities=allow_capabilities, + state=self.state.as_dict() if self.state else None, + ) + @dataclasses.dataclass class DescribeContext: diff --git a/edgedb/base_client.py b/edgedb/base_client.py index d8c33767..0272cc13 100644 --- a/edgedb/base_client.py +++ b/edgedb/base_client.py @@ -183,17 +183,7 @@ async def privileged_execute( ) else: await self._protocol.execute( - query=execute_context.query.query, - args=execute_context.query.args, - kwargs=execute_context.query.kwargs, - reg=execute_context.cache.codecs_registry, - qc=execute_context.cache.query_cache, - output_format=protocol.OutputFormat.NONE, - allow_capabilities=enums.Capability.ALL, - state=( - execute_context.state.as_dict() - if execute_context.state else None - ), + execute_context.lower(allow_capabilities=enums.Capability.ALL) ) def is_in_transaction(self) -> bool: @@ -211,56 +201,31 @@ async def raw_query(self, query_context: abstract.QueryContext): await self.connect() reconnect = False - capabilities = None i = 0 - args = dict( - query=query_context.query.query, - args=query_context.query.args, - kwargs=query_context.query.kwargs, - reg=query_context.cache.codecs_registry, - qc=query_context.cache.query_cache, - output_format=query_context.query_options.output_format, - expect_one=query_context.query_options.expect_one, - required_one=query_context.query_options.required_one, - ) if self._protocol.is_legacy: - args["allow_capabilities"] = enums.Capability.LEGACY_EXECUTE + allow_capabilities = enums.Capability.LEGACY_EXECUTE else: - args["allow_capabilities"] = enums.Capability.EXECUTE - if query_context.state is not None: - args["state"] = query_context.state.as_dict() + allow_capabilities = enums.Capability.EXECUTE + ctx = query_context.lower(allow_capabilities=allow_capabilities) while True: i += 1 try: if reconnect: await self.connect(single_attempt=True) if self._protocol.is_legacy: - return await self._protocol.legacy_execute_anonymous( - **args - ) + return await self._protocol.legacy_execute_anonymous(ctx) else: - return await self._protocol.query(**args) + return await self._protocol.query(ctx) except errors.EdgeDBError as e: if query_context.retry_options is None: raise if not e.has_tag(errors.SHOULD_RETRY): raise e - if capabilities is None: - cache_item = query_context.cache.query_cache.get( - query_context.query.query, - query_context.query_options.output_format, - implicit_limit=0, - inline_typenames=False, - inline_typeids=False, - expect_one=query_context.query_options.expect_one, - ) - if cache_item is not None: - _, _, _, capabilities = cache_item # A query is read-only if it has no capabilities i.e. # capabilities == 0. Read-only queries are safe to retry. # Explicit transaction conflicts as well. if ( - capabilities != 0 + ctx.capabilities != 0 and not isinstance(e, errors.TransactionConflictError) ): raise e @@ -281,17 +246,9 @@ async def _execute(self, execute_context: abstract.ExecuteContext) -> None: ) else: await self._protocol.execute( - query=execute_context.query.query, - args=execute_context.query.args, - kwargs=execute_context.query.kwargs, - reg=execute_context.cache.codecs_registry, - qc=execute_context.cache.query_cache, - output_format=protocol.OutputFormat.NONE, - allow_capabilities=enums.Capability.EXECUTE, - state=( - execute_context.state.as_dict() - if execute_context.state else None - ), + execute_context.lower( + allow_capabilities=enums.Capability.EXECUTE + ) ) async def describe( diff --git a/edgedb/protocol/protocol.pxd b/edgedb/protocol/protocol.pxd index 6dccf624..3befd59b 100644 --- a/edgedb/protocol/protocol.pxd +++ b/edgedb/protocol/protocol.pxd @@ -78,6 +78,30 @@ cdef class QueryCodecsCache: BaseCodec in_type, BaseCodec out_type, int capabilities) +cdef class ExecuteContext: + cdef: + # Input arguments + str query + object args + object kwargs + CodecsRegistry reg + QueryCodecsCache qc + OutputFormat output_format + bint expect_one + bint required_one + int implicit_limit + bint inline_typenames + bint inline_typeids + uint64_t allow_capabilities + object state + + # Contextual variables + bytes cardinality + BaseCodec in_dc + BaseCodec out_dc + readonly uint64_t capabilities + + cdef class SansIOProtocol: cdef: diff --git a/edgedb/protocol/protocol.pyx b/edgedb/protocol/protocol.pyx index 505f92ce..0a022dd9 100644 --- a/edgedb/protocol/protocol.pyx +++ b/edgedb/protocol/protocol.pyx @@ -132,6 +132,43 @@ cdef class QueryCodecsCache: ) +cdef class ExecuteContext: + def __init__( + self, + *, + query: str, + args, + kwargs, + reg: CodecsRegistry, + qc: QueryCodecsCache, + output_format: OutputFormat, + expect_one: bool = False, + required_one: bool = False, + implicit_limit: int = 0, + inline_typenames: bool = False, + inline_typeids: bool = False, + allow_capabilities: enums.Capability = enums.Capability.ALL, + state: typing.Optional[dict] = None, + ): + self.query = query + self.args = args + self.kwargs = kwargs + self.reg = reg + self.qc = qc + self.output_format = output_format + self.expect_one = bool(expect_one) + self.required_one = bool(required_one) + self.implicit_limit = implicit_limit + self.inline_typenames = bool(inline_typenames) + self.inline_typeids = bool(inline_typeids) + self.allow_capabilities = allow_capabilities + self.state = state + + self.cardinality = None + self.in_dc = self.out_dc = None + self.capabilities = 0 + + cdef class SansIOProtocol: def __init__(self, con_params): @@ -330,25 +367,7 @@ cdef class SansIOProtocol: return cardinality, in_dc, out_dc, capabilities - async def _execute( - self, - *, - query: str, - args, - kwargs, - reg: CodecsRegistry, - qc: QueryCodecsCache, - output_format: object, - expect_one: bint, - required_one: bint, - implicit_limit: int, - inline_typenames: bint, - inline_typeids: bint, - allow_capabilities: enums.Capability = enums.Capability.ALL, - in_dc: BaseCodec, - out_dc: BaseCodec, - state: typing.Optional[dict] = None, - ): + async def _execute(self, ctx: ExecuteContext): cdef: WriteBuffer packet WriteBuffer buf @@ -357,6 +376,22 @@ cdef class SansIOProtocol: object result bytes new_cardinality = None + str query = ctx.query + object args = ctx.args + object kwargs = ctx.kwargs + CodecsRegistry reg = ctx.reg + QueryCodecsCache qc = ctx.qc + OutputFormat output_format = ctx.output_format + bint expect_one = ctx.expect_one + bint required_one = ctx.required_one + int implicit_limit = ctx.implicit_limit + bint inline_typenames = ctx.inline_typenames + bint inline_typeids = ctx.inline_typeids + uint64_t allow_capabilities = ctx.allow_capabilities + BaseCodec in_dc = ctx.in_dc + BaseCodec out_dc = ctx.out_dc + object state = ctx.state + params = self.encode_parse_params( query=query, output_format=output_format, @@ -407,6 +442,10 @@ cdef class SansIOProtocol: expect_one, new_cardinality, in_dc, out_dc, capabilities) + ctx.cardinality = new_cardinality + ctx.in_dc = in_dc + ctx.out_dc = out_dc + ctx.capabilities = capabilities elif mtype == STATE_DATA_DESC_MSG: self.parse_describe_state_message() @@ -481,28 +520,26 @@ cdef class SansIOProtocol: else: return NULL_CODEC_ID, EMPTY_NULL_DATA - async def execute( - self, - *, - query: str, - args, - kwargs, - reg: CodecsRegistry, - qc: QueryCodecsCache, - output_format: object, - expect_one: bint = False, - required_one: bool = False, - implicit_limit: int = 0, - inline_typenames: bool = False, - inline_typeids: bool = False, - allow_capabilities: enums.Capability = enums.Capability.ALL, - state: typing.Optional[dict] = None, - ): + async def execute(self, ctx: ExecuteContext): cdef: BaseCodec in_dc BaseCodec out_dc bytes cardinality + str query = ctx.query + object args = ctx.args + object kwargs = ctx.kwargs + CodecsRegistry reg = ctx.reg + QueryCodecsCache qc = ctx.qc + OutputFormat output_format = ctx.output_format + bint expect_one = ctx.expect_one + bint required_one = ctx.required_one + int implicit_limit = ctx.implicit_limit + bint inline_typenames = ctx.inline_typenames + bint inline_typeids = ctx.inline_typeids + uint64_t allow_capabilities = ctx.allow_capabilities + object state = ctx.state + self.ensure_connected() self.reset_status() @@ -515,8 +552,10 @@ cdef class SansIOProtocol: expect_one) if codecs is not None: + ctx.cardinality = codecs[0] in_dc = codecs[1] out_dc = codecs[2] + ctx.capabilities = codecs[3] elif not args and not kwargs and not required_one: # We don't have knowledge about the in/out desc of the command, but # the caller didn't provide any arguments, so let's try using NULL @@ -556,79 +595,39 @@ cdef class SansIOProtocol: out_dc, capabilities, ) + ctx.cardinality = cardinality + ctx.capabilities = capabilities - return await self._execute( - query=query, - args=args, - kwargs=kwargs, - reg=reg, - qc=qc, - output_format=output_format, - expect_one=expect_one, - required_one=required_one, - implicit_limit=implicit_limit, - inline_typenames=inline_typenames, - inline_typeids=inline_typeids, - allow_capabilities=allow_capabilities, - in_dc=in_dc, - out_dc=out_dc, - state=state, - ) + ctx.in_dc = in_dc + ctx.out_dc = out_dc - async def query( - self, - *, - query: str, - args, - kwargs, - reg: CodecsRegistry, - qc: QueryCodecsCache, - output_format: object, - expect_one: bint = False, - required_one: bool = False, - implicit_limit: int = 0, - inline_typenames: bool = False, - inline_typeids: bool = False, - allow_capabilities: enums.Capability = enums.Capability.ALL, - state: typing.Optional[dict] = None, - ): - ret = await self.execute( - query=query, - args=args, - kwargs=kwargs, - reg=reg, - qc=qc, - output_format=output_format, - expect_one=expect_one, - required_one=required_one, - implicit_limit=implicit_limit, - inline_typenames=inline_typenames, - inline_typeids=inline_typeids, - allow_capabilities=allow_capabilities, - state=state, - ) + return await self._execute(ctx) - if expect_one: - if ret or not required_one: + async def query(self, ctx: ExecuteContext): + ret = await self.execute(ctx) + if ctx.expect_one: + if ret or not ctx.required_one: if ret: return ret[0] else: - if output_format == OutputFormat.JSON: + if ctx.output_format == OutputFormat.JSON: return 'null' else: return None else: - methname = _QUERY_SINGLE_METHOD[required_one][output_format] + methname = ( + _QUERY_SINGLE_METHOD[ctx.required_one][ctx.output_format] + ) raise errors.NoDataError( f'query executed via {methname}() returned no data') else: if ret: - if output_format == OutputFormat.JSON: + if ctx.output_format == OutputFormat.JSON: return ret[0] else: return ret else: - if output_format == OutputFormat.JSON: + if ctx.output_format == OutputFormat.JSON: return '[]' else: return ret diff --git a/edgedb/protocol/protocol_v0.pyx b/edgedb/protocol/protocol_v0.pyx index 574adda6..bcde081c 100644 --- a/edgedb/protocol/protocol_v0.pyx +++ b/edgedb/protocol/protocol_v0.pyx @@ -225,24 +225,7 @@ cdef class SansIOProtocolBackwardsCompatible(SansIOProtocol): return result - async def _legacy_optimistic_execute( - self, - *, - query: str, - args, - kwargs, - reg: CodecsRegistry, - qc: QueryCodecsCache, - output_format: object, - expect_one: bint, - required_one: bint, - implicit_limit: int, - inline_typenames: bint, - inline_typeids: bint, - allow_capabilities: typing.Optional[int] = None, - in_dc: BaseCodec, - out_dc: BaseCodec, - ): + async def _legacy_optimistic_execute(self, ctx: ExecuteContext): cdef: WriteBuffer packet WriteBuffer buf @@ -251,6 +234,21 @@ cdef class SansIOProtocolBackwardsCompatible(SansIOProtocol): object result bytes new_cardinality = None + str query = ctx.query + object args = ctx.args + object kwargs = ctx.kwargs + CodecsRegistry reg = ctx.reg + QueryCodecsCache qc = ctx.qc + OutputFormat output_format = ctx.output_format + bint expect_one = ctx.expect_one + bint required_one = ctx.required_one + int implicit_limit = ctx.implicit_limit + bint inline_typenames = ctx.inline_typenames + bint inline_typeids = ctx.inline_typeids + uint64_t allow_capabilities = ctx.allow_capabilities + BaseCodec in_dc = ctx.in_dc + BaseCodec out_dc = ctx.out_dc + buf = WriteBuffer.new_message(EXECUTE_MSG) self.legacy_write_execute_headers( buf, implicit_limit, inline_typenames, inline_typeids, @@ -296,6 +294,11 @@ cdef class SansIOProtocolBackwardsCompatible(SansIOProtocol): expect_one, new_cardinality, in_dc, out_dc, capabilities) + ctx.cardinality = new_cardinality + ctx.in_dc = in_dc + ctx.out_dc = out_dc + ctx.capabilities = capabilities + re_exec = True elif mtype == DATA_MSG: @@ -351,26 +354,24 @@ cdef class SansIOProtocolBackwardsCompatible(SansIOProtocol): else: return result - async def legacy_execute_anonymous( - self, - *, - query: str, - args, - kwargs, - reg: CodecsRegistry, - qc: QueryCodecsCache, - output_format: object, - expect_one: bint = False, - required_one: bool = False, - implicit_limit: int = 0, - inline_typenames: bool = False, - inline_typeids: bool = False, - allow_capabilities: enums.Capability = enums.Capability.ALL, - ): + async def legacy_execute_anonymous(self, ctx: ExecuteContext): cdef: BaseCodec in_dc BaseCodec out_dc + str query = ctx.query + object args = ctx.args + object kwargs = ctx.kwargs + CodecsRegistry reg = ctx.reg + QueryCodecsCache qc = ctx.qc + OutputFormat output_format = ctx.output_format + bint expect_one = ctx.expect_one + bint required_one = ctx.required_one + int implicit_limit = ctx.implicit_limit + bint inline_typenames = ctx.inline_typenames + bint inline_typeids = ctx.inline_typeids + uint64_t allow_capabilities = ctx.allow_capabilities + self.ensure_connected() self.reset_status() @@ -417,6 +418,10 @@ cdef class SansIOProtocolBackwardsCompatible(SansIOProtocol): out_dc, capabilities, ) + ctx.cardinality = cardinality + ctx.in_dc = in_dc + ctx.out_dc = out_dc + ctx.capabilities = capabilities ret = await self._legacy_execute(in_dc, out_dc, args, kwargs) @@ -424,6 +429,10 @@ cdef class SansIOProtocolBackwardsCompatible(SansIOProtocol): cardinality = codecs[0] in_dc = codecs[1] out_dc = codecs[2] + ctx.cardinality = cardinality + ctx.in_dc = in_dc + ctx.out_dc = out_dc + ctx.capabilities = codecs[3] if required_one and cardinality == CARDINALITY_NOT_APPLICABLE: methname = _QUERY_SINGLE_METHOD[required_one][output_format] @@ -431,22 +440,7 @@ cdef class SansIOProtocolBackwardsCompatible(SansIOProtocol): f'query cannot be executed with {methname}() as it ' f'does not return any data') - ret = await self._legacy_optimistic_execute( - query=query, - args=args, - kwargs=kwargs, - reg=reg, - qc=qc, - output_format=output_format, - expect_one=expect_one, - required_one=required_one, - implicit_limit=implicit_limit, - inline_typenames=inline_typenames, - inline_typeids=inline_typeids, - allow_capabilities=allow_capabilities, - in_dc=in_dc, - out_dc=out_dc, - ) + ret = await self._legacy_optimistic_execute(ctx) if expect_one: if ret or not required_one: diff --git a/tests/test_sync_retry.py b/tests/test_sync_retry.py index 831f0964..ae32c633 100644 --- a/tests/test_sync_retry.py +++ b/tests/test_sync_retry.py @@ -17,7 +17,9 @@ # +import asyncio import threading +import queue import unittest.mock from concurrent import futures @@ -254,3 +256,76 @@ def test_sync_transaction_interface_errors(self): with tx: with tx: pass + + def test_sync_retry_parse(self): + loop = asyncio.new_event_loop() + q = queue.Queue() + + async def init(): + return asyncio.Event(), asyncio.Event() + + reconnect, terminate = loop.run_until_complete(init()) + + async def proxy(r, w): + try: + while True: + buf = await r.read(65536) + if not buf: + w.close() + break + w.write(buf) + except asyncio.CancelledError: + pass + + async def cb(ri, wi): + try: + args = self.get_connect_args() + ro, wo = await asyncio.open_connection( + args["host"], args["port"] + ) + try: + fs = [ + asyncio.create_task(proxy(ri, wo)), + asyncio.create_task(proxy(ro, wi)), + asyncio.create_task(terminate.wait()), + ] + if not reconnect.is_set(): + fs.append(asyncio.create_task(reconnect.wait())) + _, pending = await asyncio.wait( + fs, return_when=asyncio.FIRST_COMPLETED + ) + for f in pending: + f.cancel() + finally: + wo.close() + finally: + wi.close() + + async def proxy_server(): + srv = await asyncio.start_server(cb, host="127.0.0.1", port=0) + try: + q.put(srv.sockets[0].getsockname()[1]) + await terminate.wait() + finally: + srv.close() + await srv.wait_closed() + + with futures.ThreadPoolExecutor(1) as pool: + pool.submit(loop.run_until_complete, proxy_server()) + try: + client = self.make_test_client( + host="127.0.0.1", + port=q.get(), + database=self.get_database_name(), + ) + + # Fill the connection pool with a healthy connection + self.assertEqual(client.query_single("SELECT 42"), 42) + + # Cut the connection to simulate an Internet interruption + loop.call_soon_threadsafe(reconnect.set) + + # Run a new query that was never compiled, retry should work + self.assertEqual(client.query_single("SELECT 1*2+3-4"), 1) + finally: + loop.call_soon_threadsafe(terminate.set) diff --git a/tests/test_sync_tx.py b/tests/test_sync_tx.py index 3ed2fc55..497af782 100644 --- a/tests/test_sync_tx.py +++ b/tests/test_sync_tx.py @@ -102,7 +102,7 @@ def test_sync_transaction_commit_failure(self): def test_sync_transaction_exclusive(self): for tx in self.client.transaction(): with tx: - query = "select sys::_sleep(0.01)" + query = "select sys::_sleep(0.5)" with ThreadPoolExecutor(max_workers=2) as executor: f1 = executor.submit(tx.execute, query) f2 = executor.submit(tx.execute, query) From b00c5ec5e6d5419bc30f612fb919d5c61abef5a6 Mon Sep 17 00:00:00 2001 From: Fantix King Date: Fri, 31 May 2024 11:29:52 -0400 Subject: [PATCH 3/9] Fix encode_parse_params() to use ExecuteContext --- edgedb/protocol/protocol.pxd | 12 +---- edgedb/protocol/protocol.pyx | 94 +++++++++--------------------------- 2 files changed, 24 insertions(+), 82 deletions(-) diff --git a/edgedb/protocol/protocol.pxd b/edgedb/protocol/protocol.pxd index 3befd59b..cca2b956 100644 --- a/edgedb/protocol/protocol.pxd +++ b/edgedb/protocol/protocol.pxd @@ -165,17 +165,7 @@ cdef class SansIOProtocol: cdef ensure_connected(self) - cdef WriteBuffer encode_parse_params( - self, - str query, - object output_format, - bint expect_one, - int implicit_limit, - bint inline_typenames, - bint inline_typeids, - uint64_t allow_capabilities, - object state, - ) + cdef WriteBuffer encode_parse_params(self, ExecuteContext ctx) include "protocol_v0.pxd" diff --git a/edgedb/protocol/protocol.pyx b/edgedb/protocol/protocol.pyx index 0a022dd9..32e8b71b 100644 --- a/edgedb/protocol/protocol.pyx +++ b/edgedb/protocol/protocol.pyx @@ -246,54 +246,31 @@ cdef class SansIOProtocol: raise errors.ClientConnectionClosedError( 'the connection has been closed') - cdef WriteBuffer encode_parse_params( - self, - str query, - object output_format, - bint expect_one, - int implicit_limit, - bint inline_typenames, - bint inline_typeids, - uint64_t allow_capabilities, - object state, - ): + cdef WriteBuffer encode_parse_params(self, ExecuteContext ctx): cdef: WriteBuffer buf compilation_flags = enums.CompilationFlag.INJECT_OUTPUT_OBJECT_IDS - if inline_typenames: + if ctx.inline_typenames: compilation_flags |= enums.CompilationFlag.INJECT_OUTPUT_TYPE_NAMES - if inline_typeids: + if ctx.inline_typeids: compilation_flags |= enums.CompilationFlag.INJECT_OUTPUT_TYPE_IDS buf = WriteBuffer.new() - buf.write_int64(allow_capabilities) + buf.write_int64(ctx.allow_capabilities) buf.write_int64(compilation_flags) - buf.write_int64(implicit_limit) - buf.write_byte(output_format) - buf.write_byte(CARDINALITY_ONE if expect_one else CARDINALITY_MANY) - buf.write_len_prefixed_utf8(query) + buf.write_int64(ctx.implicit_limit) + buf.write_byte(ctx.output_format) + buf.write_byte(CARDINALITY_ONE if ctx.expect_one else CARDINALITY_MANY) + buf.write_len_prefixed_utf8(ctx.query) - state_type_id, state_data = self.encode_state(state) + state_type_id, state_data = self.encode_state(ctx.state) buf.write_bytes(state_type_id) buf.write_bytes(state_data) return buf - async def _parse( - self, - query: str, - *, - reg: CodecsRegistry, - output_format: OutputFormat=OutputFormat.BINARY, - expect_one: bint=False, - required_one: bool=False, - implicit_limit: int=0, - inline_typenames: bool=False, - inline_typeids: bool=False, - allow_capabilities: enums.Capability = enums.Capability.ALL, - state: typing.Optional[dict] = None, - ): + async def _parse(self, ctx: ExecuteContext): cdef: WriteBuffer buf, params char mtype @@ -310,16 +287,7 @@ cdef class SansIOProtocol: buf = WriteBuffer.new_message(PREPARE_MSG) buf.write_int16(0) # no headers - params = self.encode_parse_params( - query=query, - output_format=output_format, - expect_one=expect_one, - implicit_limit=implicit_limit, - inline_typenames=inline_typenames, - inline_typeids=inline_typeids, - allow_capabilities=allow_capabilities, - state=state, - ) + params = self.encode_parse_params(ctx) buf.write_buffer(params) buf.end_message() @@ -335,16 +303,20 @@ cdef class SansIOProtocol: try: if mtype == STMT_DATA_DESC_MSG: capabilities, cardinality, in_dc, out_dc = \ - self.parse_describe_type_message(reg) + self.parse_describe_type_message(ctx.reg) elif mtype == STATE_DATA_DESC_MSG: self.parse_describe_state_message() elif mtype == ERROR_RESPONSE_MSG: exc = self.parse_error_message() - exc._query = query + exc._query = ctx.query exc = self._amend_parse_error( - exc, output_format, expect_one, required_one) + exc, + ctx.output_format, + ctx.expect_one, + ctx.required_one, + ) elif mtype == READY_FOR_COMMAND_MSG: self.parse_sync_message() @@ -358,9 +330,9 @@ cdef class SansIOProtocol: if exc is not None: raise exc - if required_one and cardinality == CARDINALITY_NOT_APPLICABLE: - assert output_format != OutputFormat.NONE - methname = _QUERY_SINGLE_METHOD[required_one][output_format] + if ctx.required_one and cardinality == CARDINALITY_NOT_APPLICABLE: + assert ctx.output_format != OutputFormat.NONE + methname = _QUERY_SINGLE_METHOD[ctx.required_one][ctx.output_format] raise errors.InterfaceError( f'query cannot be executed with {methname}() as it ' f'does not return any data') @@ -392,16 +364,7 @@ cdef class SansIOProtocol: BaseCodec out_dc = ctx.out_dc object state = ctx.state - params = self.encode_parse_params( - query=query, - output_format=output_format, - expect_one=expect_one, - implicit_limit=implicit_limit, - inline_typenames=inline_typenames, - inline_typeids=inline_typeids, - allow_capabilities=allow_capabilities, - state=state, - ) + params = self.encode_parse_params(ctx) buf = WriteBuffer.new_message(EXECUTE_MSG) buf.write_int16(0) # no headers @@ -565,18 +528,7 @@ cdef class SansIOProtocol: # command is already executed. in_dc = out_dc = NULL_CODEC else: - parsed = await self._parse( - query, - reg=reg, - output_format=output_format, - expect_one=expect_one, - required_one=required_one, - implicit_limit=implicit_limit, - inline_typenames=inline_typenames, - inline_typeids=inline_typeids, - allow_capabilities=allow_capabilities, - state=state, - ) + parsed = await self._parse(ctx) cardinality = parsed[0] in_dc = parsed[1] From 817c32c722f2b2cf56ce0b4b5625c4bf4ad1e16c Mon Sep 17 00:00:00 2001 From: Fantix King Date: Fri, 31 May 2024 11:38:16 -0400 Subject: [PATCH 4/9] Fix parse_describe_type_message() to use ExecuteContext --- edgedb/protocol/protocol.pxd | 2 +- edgedb/protocol/protocol.pyx | 34 ++++++++++++++-------------------- 2 files changed, 15 insertions(+), 21 deletions(-) diff --git a/edgedb/protocol/protocol.pxd b/edgedb/protocol/protocol.pxd index cca2b956..08c3b6f0 100644 --- a/edgedb/protocol/protocol.pxd +++ b/edgedb/protocol/protocol.pxd @@ -137,7 +137,7 @@ cdef class SansIOProtocol: cdef parse_data_messages(self, BaseCodec out_dc, result) cdef parse_sync_message(self) cdef parse_command_complete_message(self) - cdef parse_describe_type_message(self, CodecsRegistry reg) + cdef parse_describe_type_message(self, ExecuteContext ctx) cdef parse_describe_state_message(self) cdef parse_type_data(self, CodecsRegistry reg) cdef _amend_parse_error( diff --git a/edgedb/protocol/protocol.pyx b/edgedb/protocol/protocol.pyx index 32e8b71b..7676e293 100644 --- a/edgedb/protocol/protocol.pyx +++ b/edgedb/protocol/protocol.pyx @@ -302,8 +302,11 @@ cdef class SansIOProtocol: try: if mtype == STMT_DATA_DESC_MSG: - capabilities, cardinality, in_dc, out_dc = \ - self.parse_describe_type_message(ctx.reg) + self.parse_describe_type_message(ctx) + cardinality = ctx.cardinality + capabilities = ctx.capabilities + in_dc = ctx.in_dc + out_dc = ctx.out_dc elif mtype == STATE_DATA_DESC_MSG: self.parse_describe_state_message() @@ -346,7 +349,6 @@ cdef class SansIOProtocol: WriteBuffer params char mtype object result - bytes new_cardinality = None str query = ctx.query object args = ctx.args @@ -393,8 +395,7 @@ cdef class SansIOProtocol: try: if mtype == STMT_DATA_DESC_MSG: # our in/out type spec is out-dated - capabilities, new_cardinality, in_dc, out_dc = \ - self.parse_describe_type_message(reg) + self.parse_describe_type_message(ctx) qc.set( query, @@ -403,12 +404,10 @@ cdef class SansIOProtocol: inline_typenames, inline_typeids, expect_one, - new_cardinality, - in_dc, out_dc, capabilities) - ctx.cardinality = new_cardinality - ctx.in_dc = in_dc - ctx.out_dc = out_dc - ctx.capabilities = capabilities + ctx.cardinality == CARDINALITY_NOT_APPLICABLE, + ctx.in_dc, ctx.out_dc, ctx.capabilities) + in_dc = ctx.in_dc + out_dc = ctx.out_dc elif mtype == STATE_DATA_DESC_MSG: self.parse_describe_state_message() @@ -1073,22 +1072,17 @@ cdef class SansIOProtocol: (in_dc).encode_args(buf, kwargs) - cdef parse_describe_type_message(self, CodecsRegistry reg): + cdef parse_describe_type_message(self, ExecuteContext ctx): assert self.buffer.get_message_type() == COMMAND_DATA_DESC_MSG - cdef: - bytes cardinality - try: self.ignore_headers() - capabilities = self.buffer.read_int64() - cardinality = self.buffer.read_byte() - in_dc, out_dc = self.parse_type_data(reg) + ctx.capabilities = self.buffer.read_int64() + ctx.cardinality = self.buffer.read_byte() + ctx.in_dc, ctx.out_dc = self.parse_type_data(ctx.reg) finally: self.buffer.finish_message() - return capabilities, cardinality, in_dc, out_dc - cdef parse_describe_state_message(self): assert self.buffer.get_message_type() == STATE_DATA_DESC_MSG try: From af71feb8f81d66a9313659e56c4233216a5a7581 Mon Sep 17 00:00:00 2001 From: Fantix King Date: Fri, 31 May 2024 12:34:30 -0400 Subject: [PATCH 5/9] Extract ExecuteContext.has_na_cardinality() --- edgedb/protocol/protocol.pxd | 2 ++ edgedb/protocol/protocol.pyx | 11 ++++++----- edgedb/protocol/protocol_v0.pyx | 2 +- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/edgedb/protocol/protocol.pxd b/edgedb/protocol/protocol.pxd index 08c3b6f0..9f6c652e 100644 --- a/edgedb/protocol/protocol.pxd +++ b/edgedb/protocol/protocol.pxd @@ -101,6 +101,8 @@ cdef class ExecuteContext: BaseCodec out_dc readonly uint64_t capabilities + cdef inline bint has_na_cardinality(self) + cdef class SansIOProtocol: diff --git a/edgedb/protocol/protocol.pyx b/edgedb/protocol/protocol.pyx index 7676e293..4da8fcd0 100644 --- a/edgedb/protocol/protocol.pyx +++ b/edgedb/protocol/protocol.pyx @@ -168,6 +168,9 @@ cdef class ExecuteContext: self.in_dc = self.out_dc = None self.capabilities = 0 + cdef inline bint has_na_cardinality(self): + return self.cardinality == CARDINALITY_NOT_APPLICABLE + cdef class SansIOProtocol: @@ -279,7 +282,6 @@ cdef class SansIOProtocol: int16_t type_size bytes in_type_id bytes out_type_id - bytes cardinality if not self.connected: raise RuntimeError('not connected') @@ -303,7 +305,6 @@ cdef class SansIOProtocol: try: if mtype == STMT_DATA_DESC_MSG: self.parse_describe_type_message(ctx) - cardinality = ctx.cardinality capabilities = ctx.capabilities in_dc = ctx.in_dc out_dc = ctx.out_dc @@ -333,14 +334,14 @@ cdef class SansIOProtocol: if exc is not None: raise exc - if ctx.required_one and cardinality == CARDINALITY_NOT_APPLICABLE: + if ctx.required_one and ctx.has_na_cardinality(): assert ctx.output_format != OutputFormat.NONE methname = _QUERY_SINGLE_METHOD[ctx.required_one][ctx.output_format] raise errors.InterfaceError( f'query cannot be executed with {methname}() as it ' f'does not return any data') - return cardinality, in_dc, out_dc, capabilities + return ctx.cardinality, in_dc, out_dc, capabilities async def _execute(self, ctx: ExecuteContext): cdef: @@ -404,7 +405,7 @@ cdef class SansIOProtocol: inline_typenames, inline_typeids, expect_one, - ctx.cardinality == CARDINALITY_NOT_APPLICABLE, + ctx.has_na_cardinality(), ctx.in_dc, ctx.out_dc, ctx.capabilities) in_dc = ctx.in_dc out_dc = ctx.out_dc diff --git a/edgedb/protocol/protocol_v0.pyx b/edgedb/protocol/protocol_v0.pyx index bcde081c..a2e0c748 100644 --- a/edgedb/protocol/protocol_v0.pyx +++ b/edgedb/protocol/protocol_v0.pyx @@ -434,7 +434,7 @@ cdef class SansIOProtocolBackwardsCompatible(SansIOProtocol): ctx.out_dc = out_dc ctx.capabilities = codecs[3] - if required_one and cardinality == CARDINALITY_NOT_APPLICABLE: + if required_one and ctx.has_na_cardinality(): methname = _QUERY_SINGLE_METHOD[required_one][output_format] raise errors.InterfaceError( f'query cannot be executed with {methname}() as it ' From 356044dc8a5f5777565c41002e4c2ed402a50e2a Mon Sep 17 00:00:00 2001 From: Fantix King Date: Fri, 31 May 2024 12:51:37 -0400 Subject: [PATCH 6/9] Extract ExecuteContext.load_from_cache() --- edgedb/protocol/protocol.pxd | 5 +++++ edgedb/protocol/protocol.pyx | 39 ++++++++++++++++++--------------- edgedb/protocol/protocol_v0.pyx | 17 +------------- 3 files changed, 27 insertions(+), 34 deletions(-) diff --git a/edgedb/protocol/protocol.pxd b/edgedb/protocol/protocol.pxd index 9f6c652e..e47ff529 100644 --- a/edgedb/protocol/protocol.pxd +++ b/edgedb/protocol/protocol.pxd @@ -72,6 +72,10 @@ cdef class QueryCodecsCache: cdef: LRUMapping queries + cdef get( + self, str query, OutputFormat output_format, + int implicit_limit, bint inline_typenames, bint inline_typeids, + bint expect_one) cdef set(self, str query, OutputFormat output_format, int implicit_limit, bint inline_typenames, bint inline_typeids, bint expect_one, bytes cardinality, @@ -102,6 +106,7 @@ cdef class ExecuteContext: readonly uint64_t capabilities cdef inline bint has_na_cardinality(self) + cdef bint load_from_cache(self) cdef class SansIOProtocol: diff --git a/edgedb/protocol/protocol.pyx b/edgedb/protocol/protocol.pyx index 4da8fcd0..08f3810f 100644 --- a/edgedb/protocol/protocol.pyx +++ b/edgedb/protocol/protocol.pyx @@ -96,7 +96,7 @@ cdef class QueryCodecsCache: def __init__(self, *, cache_size=1000): self.queries = LRUMapping(maxsize=cache_size) - def get( + cdef get( self, str query, OutputFormat output_format, int implicit_limit, bint inline_typenames, bint inline_typeids, bint expect_one @@ -171,6 +171,21 @@ cdef class ExecuteContext: cdef inline bint has_na_cardinality(self): return self.cardinality == CARDINALITY_NOT_APPLICABLE + cdef bint load_from_cache(self): + rv = self.qc.get( + self.query, + self.output_format, + self.implicit_limit, + self.inline_typenames, + self.inline_typeids, + self.expect_one, + ) + if rv is None: + return False + else: + self.cardinality, self.in_dc, self.out_dc, self.capabilities = rv + return True + cdef class SansIOProtocol: @@ -506,19 +521,8 @@ cdef class SansIOProtocol: self.ensure_connected() self.reset_status() - codecs = qc.get( - query, - output_format, - implicit_limit, - inline_typenames, - inline_typeids, - expect_one) - - if codecs is not None: - ctx.cardinality = codecs[0] - in_dc = codecs[1] - out_dc = codecs[2] - ctx.capabilities = codecs[3] + if ctx.load_from_cache(): + pass elif not args and not kwargs and not required_one: # We don't have knowledge about the in/out desc of the command, but # the caller didn't provide any arguments, so let's try using NULL @@ -526,7 +530,7 @@ cdef class SansIOProtocol: # without an additional Parse, unless required_one is set because # it'll be too late to find out the cardinality is wrong when the # command is already executed. - in_dc = out_dc = NULL_CODEC + ctx.in_dc = ctx.out_dc = NULL_CODEC else: parsed = await self._parse(ctx) @@ -549,9 +553,8 @@ cdef class SansIOProtocol: ) ctx.cardinality = cardinality ctx.capabilities = capabilities - - ctx.in_dc = in_dc - ctx.out_dc = out_dc + ctx.in_dc = in_dc + ctx.out_dc = out_dc return await self._execute(ctx) diff --git a/edgedb/protocol/protocol_v0.pyx b/edgedb/protocol/protocol_v0.pyx index a2e0c748..fef3a029 100644 --- a/edgedb/protocol/protocol_v0.pyx +++ b/edgedb/protocol/protocol_v0.pyx @@ -375,14 +375,7 @@ cdef class SansIOProtocolBackwardsCompatible(SansIOProtocol): self.ensure_connected() self.reset_status() - codecs = qc.get( - query, - output_format, - implicit_limit, - inline_typenames, - inline_typeids, - expect_one) - if codecs is None: + if not ctx.load_from_cache(): codecs = await self._legacy_parse( query, reg=reg, @@ -426,14 +419,6 @@ cdef class SansIOProtocolBackwardsCompatible(SansIOProtocol): ret = await self._legacy_execute(in_dc, out_dc, args, kwargs) else: - cardinality = codecs[0] - in_dc = codecs[1] - out_dc = codecs[2] - ctx.cardinality = cardinality - ctx.in_dc = in_dc - ctx.out_dc = out_dc - ctx.capabilities = codecs[3] - if required_one and ctx.has_na_cardinality(): methname = _QUERY_SINGLE_METHOD[required_one][output_format] raise errors.InterfaceError( From 8d0a609de51113770d1f02f23763da267394beb0 Mon Sep 17 00:00:00 2001 From: Fantix King Date: Fri, 31 May 2024 13:01:27 -0400 Subject: [PATCH 7/9] Extract ExecuteContext.store_to_cache() --- edgedb/protocol/protocol.pxd | 1 + edgedb/protocol/protocol.pyx | 38 ++++++++++++++------------------- edgedb/protocol/protocol_v0.pyx | 23 ++------------------ 3 files changed, 19 insertions(+), 43 deletions(-) diff --git a/edgedb/protocol/protocol.pxd b/edgedb/protocol/protocol.pxd index e47ff529..9a556e70 100644 --- a/edgedb/protocol/protocol.pxd +++ b/edgedb/protocol/protocol.pxd @@ -107,6 +107,7 @@ cdef class ExecuteContext: cdef inline bint has_na_cardinality(self) cdef bint load_from_cache(self) + cdef inline store_to_cache(self) cdef class SansIOProtocol: diff --git a/edgedb/protocol/protocol.pyx b/edgedb/protocol/protocol.pyx index 08f3810f..ba31ca2e 100644 --- a/edgedb/protocol/protocol.pyx +++ b/edgedb/protocol/protocol.pyx @@ -186,6 +186,20 @@ cdef class ExecuteContext: self.cardinality, self.in_dc, self.out_dc, self.capabilities = rv return True + cdef inline store_to_cache(self): + self.qc.set( + self.query, + self.output_format, + self.implicit_limit, + self.inline_typenames, + self.inline_typeids, + self.expect_one, + self.cardinality, + self.in_dc, + self.out_dc, + self.capabilities, + ) + cdef class SansIOProtocol: @@ -412,16 +426,7 @@ cdef class SansIOProtocol: if mtype == STMT_DATA_DESC_MSG: # our in/out type spec is out-dated self.parse_describe_type_message(ctx) - - qc.set( - query, - output_format, - implicit_limit, - inline_typenames, - inline_typeids, - expect_one, - ctx.has_na_cardinality(), - ctx.in_dc, ctx.out_dc, ctx.capabilities) + ctx.store_to_cache() in_dc = ctx.in_dc out_dc = ctx.out_dc @@ -539,22 +544,11 @@ cdef class SansIOProtocol: out_dc = parsed[2] capabilities = parsed[3] - qc.set( - query, - output_format, - implicit_limit, - inline_typenames, - inline_typeids, - expect_one, - cardinality, - in_dc, - out_dc, - capabilities, - ) ctx.cardinality = cardinality ctx.capabilities = capabilities ctx.in_dc = in_dc ctx.out_dc = out_dc + ctx.store_to_cache() return await self._execute(ctx) diff --git a/edgedb/protocol/protocol_v0.pyx b/edgedb/protocol/protocol_v0.pyx index fef3a029..3ed76349 100644 --- a/edgedb/protocol/protocol_v0.pyx +++ b/edgedb/protocol/protocol_v0.pyx @@ -285,19 +285,11 @@ cdef class SansIOProtocolBackwardsCompatible(SansIOProtocol): if capabilities is not None: capabilities = int.from_bytes(capabilities, 'big') - qc.set( - query, - output_format, - implicit_limit, - inline_typenames, - inline_typeids, - expect_one, - new_cardinality, - in_dc, out_dc, capabilities) ctx.cardinality = new_cardinality ctx.in_dc = in_dc ctx.out_dc = out_dc ctx.capabilities = capabilities + ctx.store_to_cache() re_exec = True @@ -399,22 +391,11 @@ cdef class SansIOProtocolBackwardsCompatible(SansIOProtocol): if capabilities is not None: capabilities = int.from_bytes(capabilities, 'big') - qc.set( - query, - output_format, - implicit_limit, - inline_typenames, - inline_typeids, - expect_one, - cardinality, - in_dc, - out_dc, - capabilities, - ) ctx.cardinality = cardinality ctx.in_dc = in_dc ctx.out_dc = out_dc ctx.capabilities = capabilities + ctx.store_to_cache() ret = await self._legacy_execute(in_dc, out_dc, args, kwargs) From 51ddfd87f39e77d6f4f0fe86fc0fd7e366301a25 Mon Sep 17 00:00:00 2001 From: Fantix King Date: Fri, 31 May 2024 13:23:23 -0400 Subject: [PATCH 8/9] Simplify _parse() and _execute() --- edgedb/protocol/protocol.pyx | 80 ++++++++---------------------------- 1 file changed, 16 insertions(+), 64 deletions(-) diff --git a/edgedb/protocol/protocol.pyx b/edgedb/protocol/protocol.pyx index ba31ca2e..228ba75d 100644 --- a/edgedb/protocol/protocol.pyx +++ b/edgedb/protocol/protocol.pyx @@ -306,8 +306,6 @@ cdef class SansIOProtocol: cdef: WriteBuffer buf, params char mtype - BaseCodec in_dc = None - BaseCodec out_dc = None int16_t type_size bytes in_type_id bytes out_type_id @@ -334,9 +332,6 @@ cdef class SansIOProtocol: try: if mtype == STMT_DATA_DESC_MSG: self.parse_describe_type_message(ctx) - capabilities = ctx.capabilities - in_dc = ctx.in_dc - out_dc = ctx.out_dc elif mtype == STATE_DATA_DESC_MSG: self.parse_describe_state_message() @@ -370,8 +365,6 @@ cdef class SansIOProtocol: f'query cannot be executed with {methname}() as it ' f'does not return any data') - return ctx.cardinality, in_dc, out_dc, capabilities - async def _execute(self, ctx: ExecuteContext): cdef: WriteBuffer packet @@ -380,22 +373,6 @@ cdef class SansIOProtocol: char mtype object result - str query = ctx.query - object args = ctx.args - object kwargs = ctx.kwargs - CodecsRegistry reg = ctx.reg - QueryCodecsCache qc = ctx.qc - OutputFormat output_format = ctx.output_format - bint expect_one = ctx.expect_one - bint required_one = ctx.required_one - int implicit_limit = ctx.implicit_limit - bint inline_typenames = ctx.inline_typenames - bint inline_typeids = ctx.inline_typeids - uint64_t allow_capabilities = ctx.allow_capabilities - BaseCodec in_dc = ctx.in_dc - BaseCodec out_dc = ctx.out_dc - object state = ctx.state - params = self.encode_parse_params(ctx) buf = WriteBuffer.new_message(EXECUTE_MSG) @@ -403,10 +380,10 @@ cdef class SansIOProtocol: buf.write_buffer(params) - buf.write_bytes(in_dc.get_tid()) - buf.write_bytes(out_dc.get_tid()) + buf.write_bytes(ctx.in_dc.get_tid()) + buf.write_bytes(ctx.out_dc.get_tid()) - self.encode_args(in_dc, buf, args, kwargs) + self.encode_args(ctx.in_dc, buf, ctx.args, ctx.kwargs) buf.end_message() @@ -427,8 +404,6 @@ cdef class SansIOProtocol: # our in/out type spec is out-dated self.parse_describe_type_message(ctx) ctx.store_to_cache() - in_dc = ctx.in_dc - out_dc = ctx.out_dc elif mtype == STATE_DATA_DESC_MSG: self.parse_describe_state_message() @@ -436,7 +411,7 @@ cdef class SansIOProtocol: elif mtype == DATA_MSG: if exc is None: try: - self.parse_data_messages(out_dc, result) + self.parse_data_messages(ctx.out_dc, result) except Exception as ex: # An error during data decoding. We need to # handle this as gracefully as possible: @@ -458,19 +433,25 @@ cdef class SansIOProtocol: elif mtype == ERROR_RESPONSE_MSG: exc = self.parse_error_message() - exc._query = query + exc._query = ctx.query if exc.get_code() == parameter_type_mismatch_code: - if not isinstance(in_dc, NullCodec): + if not isinstance(ctx.in_dc, NullCodec): buf = WriteBuffer.new() try: - self.encode_args(in_dc, buf, args, kwargs) + self.encode_args( + ctx.in_dc, buf, ctx.args, ctx.kwargs + ) except errors.QueryArgumentError as ex: exc = ex finally: buf = None else: exc = self._amend_parse_error( - exc, output_format, expect_one, required_one) + exc, + ctx.output_format, + ctx.expect_one, + ctx.required_one, + ) elif mtype == READY_FOR_COMMAND_MSG: self.parse_sync_message() @@ -504,31 +485,12 @@ cdef class SansIOProtocol: return NULL_CODEC_ID, EMPTY_NULL_DATA async def execute(self, ctx: ExecuteContext): - cdef: - BaseCodec in_dc - BaseCodec out_dc - bytes cardinality - - str query = ctx.query - object args = ctx.args - object kwargs = ctx.kwargs - CodecsRegistry reg = ctx.reg - QueryCodecsCache qc = ctx.qc - OutputFormat output_format = ctx.output_format - bint expect_one = ctx.expect_one - bint required_one = ctx.required_one - int implicit_limit = ctx.implicit_limit - bint inline_typenames = ctx.inline_typenames - bint inline_typeids = ctx.inline_typeids - uint64_t allow_capabilities = ctx.allow_capabilities - object state = ctx.state - self.ensure_connected() self.reset_status() if ctx.load_from_cache(): pass - elif not args and not kwargs and not required_one: + elif not ctx.args and not ctx.kwargs and not ctx.required_one: # We don't have knowledge about the in/out desc of the command, but # the caller didn't provide any arguments, so let's try using NULL # for both in (assumed) and out (the server will correct it) desc @@ -537,17 +499,7 @@ cdef class SansIOProtocol: # command is already executed. ctx.in_dc = ctx.out_dc = NULL_CODEC else: - parsed = await self._parse(ctx) - - cardinality = parsed[0] - in_dc = parsed[1] - out_dc = parsed[2] - capabilities = parsed[3] - - ctx.cardinality = cardinality - ctx.capabilities = capabilities - ctx.in_dc = in_dc - ctx.out_dc = out_dc + await self._parse(ctx) ctx.store_to_cache() return await self._execute(ctx) From 56b28f5331f2d673ea7e58b6308947b02084dcd5 Mon Sep 17 00:00:00 2001 From: Fantix King Date: Fri, 31 May 2024 13:32:14 -0400 Subject: [PATCH 9/9] Drop QueryCodecsCache --- edgedb/abstract.py | 2 +- edgedb/base_client.py | 5 +-- edgedb/protocol/protocol.pxd | 17 +--------- edgedb/protocol/protocol.pyx | 57 ++++++--------------------------- edgedb/protocol/protocol_v0.pyx | 2 -- 5 files changed, 14 insertions(+), 69 deletions(-) diff --git a/edgedb/abstract.py b/edgedb/abstract.py index a1c6cfa4..860275c7 100644 --- a/edgedb/abstract.py +++ b/edgedb/abstract.py @@ -49,7 +49,7 @@ class QueryWithArgs(typing.NamedTuple): class QueryCache(typing.NamedTuple): codecs_registry: protocol.CodecsRegistry - query_cache: protocol.QueryCodecsCache + query_cache: protocol.LRUMapping class QueryOptions(typing.NamedTuple): diff --git a/edgedb/base_client.py b/edgedb/base_client.py index 0272cc13..0744211c 100644 --- a/edgedb/base_client.py +++ b/edgedb/base_client.py @@ -31,6 +31,7 @@ BaseConnection_T = typing.TypeVar('BaseConnection_T', bound='BaseConnection') +QUERY_CACHE_SIZE = 1000 class BaseConnection(metaclass=abc.ABCMeta): @@ -430,7 +431,7 @@ def __init__( self._connection_factory = connection_factory self._connect_args = connect_args self._codecs_registry = protocol.CodecsRegistry() - self._query_cache = protocol.QueryCodecsCache() + self._query_cache = protocol.LRUMapping(maxsize=QUERY_CACHE_SIZE) if max_concurrency is not None and max_concurrency <= 0: raise ValueError( @@ -527,7 +528,7 @@ def set_connect_args(self, dsn=None, **connect_kwargs): connect_kwargs["dsn"] = dsn self._connect_args = connect_kwargs self._codecs_registry = protocol.CodecsRegistry() - self._query_cache = protocol.QueryCodecsCache() + self._query_cache = protocol.LRUMapping(maxsize=QUERY_CACHE_SIZE) self._working_addr = None self._working_config = None self._working_params = None diff --git a/edgedb/protocol/protocol.pxd b/edgedb/protocol/protocol.pxd index 9a556e70..402c988c 100644 --- a/edgedb/protocol/protocol.pxd +++ b/edgedb/protocol/protocol.pxd @@ -67,21 +67,6 @@ cdef enum AuthenticationStatuses: AUTH_SASL_FINAL = 12 -cdef class QueryCodecsCache: - - cdef: - LRUMapping queries - - cdef get( - self, str query, OutputFormat output_format, - int implicit_limit, bint inline_typenames, bint inline_typeids, - bint expect_one) - cdef set(self, str query, OutputFormat output_format, - int implicit_limit, bint inline_typenames, bint inline_typeids, - bint expect_one, bytes cardinality, - BaseCodec in_type, BaseCodec out_type, int capabilities) - - cdef class ExecuteContext: cdef: # Input arguments @@ -89,7 +74,7 @@ cdef class ExecuteContext: object args object kwargs CodecsRegistry reg - QueryCodecsCache qc + LRUMapping qc OutputFormat output_format bint expect_one bint required_one diff --git a/edgedb/protocol/protocol.pyx b/edgedb/protocol/protocol.pyx index 228ba75d..3e0643d3 100644 --- a/edgedb/protocol/protocol.pyx +++ b/edgedb/protocol/protocol.pyx @@ -91,47 +91,6 @@ cdef dict OLD_ERROR_CODES = { } -cdef class QueryCodecsCache: - - def __init__(self, *, cache_size=1000): - self.queries = LRUMapping(maxsize=cache_size) - - cdef get( - self, str query, OutputFormat output_format, - int implicit_limit, bint inline_typenames, bint inline_typeids, - bint expect_one - ): - key = ( - query, - output_format, - implicit_limit, - inline_typenames, - inline_typeids, - expect_one, - ) - return self.queries.get(key, None) - - cdef set( - self, str query, OutputFormat output_format, - int implicit_limit, bint inline_typenames, bint inline_typeids, - bint expect_one, bytes cardinality, - BaseCodec in_type, BaseCodec out_type, int capabilities, - ): - key = ( - query, - output_format, - implicit_limit, - inline_typenames, - inline_typeids, - expect_one, - ) - assert in_type is not None - assert out_type is not None - self.queries[key] = ( - cardinality, in_type, out_type, capabilities - ) - - cdef class ExecuteContext: def __init__( self, @@ -140,7 +99,7 @@ cdef class ExecuteContext: args, kwargs, reg: CodecsRegistry, - qc: QueryCodecsCache, + qc: LRUMapping, output_format: OutputFormat, expect_one: bool = False, required_one: bool = False, @@ -172,7 +131,7 @@ cdef class ExecuteContext: return self.cardinality == CARDINALITY_NOT_APPLICABLE cdef bint load_from_cache(self): - rv = self.qc.get( + key = ( self.query, self.output_format, self.implicit_limit, @@ -180,6 +139,7 @@ cdef class ExecuteContext: self.inline_typeids, self.expect_one, ) + rv = self.qc.get(key, None) if rv is None: return False else: @@ -187,17 +147,18 @@ cdef class ExecuteContext: return True cdef inline store_to_cache(self): - self.qc.set( + assert self.in_dc is not None + assert self.out_dc is not None + key = ( self.query, self.output_format, self.implicit_limit, self.inline_typenames, self.inline_typeids, self.expect_one, - self.cardinality, - self.in_dc, - self.out_dc, - self.capabilities, + ) + self.qc[key] = ( + self.cardinality, self.in_dc, self.out_dc, self.capabilities ) diff --git a/edgedb/protocol/protocol_v0.pyx b/edgedb/protocol/protocol_v0.pyx index 3ed76349..2a4cb80b 100644 --- a/edgedb/protocol/protocol_v0.pyx +++ b/edgedb/protocol/protocol_v0.pyx @@ -238,7 +238,6 @@ cdef class SansIOProtocolBackwardsCompatible(SansIOProtocol): object args = ctx.args object kwargs = ctx.kwargs CodecsRegistry reg = ctx.reg - QueryCodecsCache qc = ctx.qc OutputFormat output_format = ctx.output_format bint expect_one = ctx.expect_one bint required_one = ctx.required_one @@ -355,7 +354,6 @@ cdef class SansIOProtocolBackwardsCompatible(SansIOProtocol): object args = ctx.args object kwargs = ctx.kwargs CodecsRegistry reg = ctx.reg - QueryCodecsCache qc = ctx.qc OutputFormat output_format = ctx.output_format bint expect_one = ctx.expect_one bint required_one = ctx.required_one