From 908d09e0dd34762a6db688448a34d6615f6d9389 Mon Sep 17 00:00:00 2001 From: pineiden Date: Wed, 19 Aug 2020 19:07:47 -0400 Subject: [PATCH 01/12] Update to asyncio py3.8+. More safe on read-write actions Read from stream use async loop --- rethinkdb/ast.py | 151 ++++++++--------- rethinkdb/asyncio_net/net_asyncio.py | 244 ++++++++++++++------------- rethinkdb/net.py | 200 +++++++++------------- 3 files changed, 279 insertions(+), 316 deletions(-) diff --git a/rethinkdb/ast.py b/rethinkdb/ast.py index 3b9fddc6..759f75d7 100644 --- a/rethinkdb/ast.py +++ b/rethinkdb/ast.py @@ -17,7 +17,6 @@ __all__ = ["expr", "RqlQuery", "ReQLEncoder", "ReQLDecoder", "Repl"] - import base64 import binascii import collections @@ -26,7 +25,8 @@ import threading from rethinkdb import ql2_pb2 -from rethinkdb.errors import QueryPrinter, ReqlDriverCompileError, ReqlDriverError, T +from rethinkdb.errors import (QueryPrinter, ReqlDriverCompileError, + ReqlDriverError, T) P_TERM = ql2_pb2.Term.TermType @@ -77,7 +77,8 @@ def expr(val, nesting_depth=20): Convert a Python primitive into a RQL primitive value """ if not isinstance(nesting_depth, int): - raise ReqlDriverCompileError("Second argument to `r.expr` must be a number.") + raise ReqlDriverCompileError( + "Second argument to `r.expr` must be a number.") if nesting_depth <= 0: raise ReqlDriverCompileError("Nesting depth limit exceeded.") @@ -95,9 +96,7 @@ def expr(val, nesting_depth=20): timezone values with r.make_timezone(\"[+-]HH:MM\"). Alternatively, use one of ReQL's bultin time constructors, r.now, r.time, or r.iso8601. - """ - % (type(val).__name__) - ) + """ % (type(val).__name__)) return ISO8601(val.isoformat()) elif isinstance(val, RqlBinary): return Binary(val) @@ -136,12 +135,10 @@ def run(self, c=None, **global_optargs): if Repl.repl_active: raise ReqlDriverError( "RqlQuery.run must be given a connection to run on. A default connection has been set with " - "`repl()` on another thread, but not this one." - ) + "`repl()` on another thread, but not this one.") else: raise ReqlDriverError( - "RqlQuery.run must be given a connection to run on." - ) + "RqlQuery.run must be given a connection to run on.") return c._start(self, **global_optargs) @@ -392,7 +389,10 @@ def set_difference(self, *args): def __getitem__(self, index): if isinstance(index, slice): if index.stop: - return Slice(self, index.start or 0, index.stop, bracket_operator=True) + return Slice(self, + index.start or 0, + index.stop, + bracket_operator=True) else: return Slice( self, @@ -408,8 +408,7 @@ def __iter__(*args, **kwargs): raise ReqlDriverError( "__iter__ called on an RqlQuery object.\n" "To iterate over the results of a query, call run first.\n" - "To iterate inside a query, use map or for_each." - ) + "To iterate inside a query, use map or for_each.") def get_field(self, *args): return GetField(self, *args) @@ -468,7 +467,7 @@ def max(self, *args, **kwargs): def map(self, *args): if len(args) > 0: # `func_wrap` only the last argument - return Map(self, *(args[:-1] + (func_wrap(args[-1]),))) + return Map(self, *(args[:-1] + (func_wrap(args[-1]), ))) else: return Map(self) @@ -481,7 +480,8 @@ def fold(self, *args, **kwargs): kwfuncargs = {} for arg_name in kwargs: kwfuncargs[arg_name] = func_wrap(kwargs[arg_name]) - return Fold(self, *(args[:-1] + (func_wrap(args[-1]),)), **kwfuncargs) + return Fold(self, *(args[:-1] + (func_wrap(args[-1]), )), + **kwfuncargs) else: return Fold(self) @@ -492,7 +492,10 @@ def concat_map(self, *args): return ConcatMap(self, *[func_wrap(arg) for arg in args]) def order_by(self, *args, **kwargs): - args = [arg if isinstance(arg, (Asc, Desc)) else func_wrap(arg) for arg in args] + args = [ + arg if isinstance(arg, (Asc, Desc)) else func_wrap(arg) + for arg in args + ] return OrderBy(self, *args, **kwargs) def between(self, *args, **kwargs): @@ -634,12 +637,14 @@ def set_infix(self): def compose(self, args, optargs): t_args = [ - T("r.expr(", args[i], ")") if needs_wrap(self._args[i]) else args[i] + T("r.expr(", args[i], ")") + if needs_wrap(self._args[i]) else args[i] for i in xrange(len(args)) ] if self.infix: - return T("(", T(*t_args, intsp=[" ", self.statement_infix, " "]), ")") + return T("(", T(*t_args, intsp=[" ", self.statement_infix, " "]), + ")") else: return T("r.", self.statement, "(", T(*t_args, intsp=", "), ")") @@ -647,7 +652,8 @@ def compose(self, args, optargs): class RqlBiOperQuery(RqlQuery): def compose(self, args, optargs): t_args = [ - T("r.expr(", args[i], ")") if needs_wrap(self._args[i]) else args[i] + T("r.expr(", args[i], ")") + if needs_wrap(self._args[i]) else args[i] for i in xrange(len(args)) ] return T("(", T(*t_args, intsp=[" ", self.statement, " "]), ")") @@ -666,11 +672,10 @@ def __init__(self, *args, **optargs): "This is almost always a precedence error.\n" "Note that `a < b | b < c` <==> `a < (b | b) < c`.\n" "If you really want this behavior, use `.or_` or " - "`.and_` instead." - ) + "`.and_` instead.") raise ReqlDriverCompileError( - err % (self.statement, QueryPrinter(self).print_query()) - ) + err % + (self.statement, QueryPrinter(self).print_query())) except AttributeError: pass # No infix attribute, so not possible to be an infix bool operator @@ -723,7 +728,7 @@ def __init__(self, offsetstr): self.delta = datetime.timedelta(hours=hours, minutes=minutes) def __getinitargs__(self): - return (self.offsetstr,) + return (self.offsetstr, ) def __copy__(self): return RqlTzinfo(self.offsetstr) @@ -751,9 +756,8 @@ def recursively_make_hashable(obj): if isinstance(obj, list): return tuple([recursively_make_hashable(i) for i in obj]) elif isinstance(obj, dict): - return frozenset( - [(k, recursively_make_hashable(v)) for k, v in dict_items(obj)] - ) + return frozenset([(k, recursively_make_hashable(v)) + for k, v in dict_items(obj)]) return obj @@ -789,17 +793,12 @@ def __init__(self, reql_format_opts=None): def convert_time(self, obj): if "epoch_time" not in obj: raise ReqlDriverError( - ( - "pseudo-type TIME object %s does not " - + 'have expected field "epoch_time".' - ) - % json.dumps(obj) - ) + ("pseudo-type TIME object %s does not " + + 'have expected field "epoch_time".') % json.dumps(obj)) if "timezone" in obj: - return datetime.datetime.fromtimestamp( - obj["epoch_time"], RqlTzinfo(obj["timezone"]) - ) + return datetime.datetime.fromtimestamp(obj["epoch_time"], + RqlTzinfo(obj["timezone"])) else: return datetime.datetime.utcfromtimestamp(obj["epoch_time"]) @@ -807,24 +806,18 @@ def convert_time(self, obj): def convert_grouped_data(obj): if "data" not in obj: raise ReqlDriverError( - ( - "pseudo-type GROUPED_DATA object" - + ' %s does not have the expected field "data".' - ) - % json.dumps(obj) - ) - return dict([(recursively_make_hashable(k), v) for k, v in obj["data"]]) + ("pseudo-type GROUPED_DATA object" + + ' %s does not have the expected field "data".') % + json.dumps(obj)) + return dict([(recursively_make_hashable(k), v) + for k, v in obj["data"]]) @staticmethod def convert_binary(obj): if "data" not in obj: raise ReqlDriverError( - ( - "pseudo-type BINARY object %s does not have " - + 'the expected field "data".' - ) - % json.dumps(obj) - ) + ("pseudo-type BINARY object %s does not have " + + 'the expected field "data".') % json.dumps(obj)) return RqlBinary(base64.b64decode(obj["data"].encode("utf-8"))) def convert_pseudotype(self, obj): @@ -837,16 +830,14 @@ def convert_pseudotype(self, obj): return self.convert_time(obj) elif time_format != "raw": raise ReqlDriverError( - 'Unknown time_format run option "%s".' % time_format - ) + 'Unknown time_format run option "%s".' % time_format) elif reql_type == "GROUPED_DATA": group_format = self.reql_format_opts.get("group_format") if group_format is None or group_format == "native": return self.convert_grouped_data(obj) elif group_format != "raw": raise ReqlDriverError( - 'Unknown group_format run option "%s".' % group_format - ) + 'Unknown group_format run option "%s".' % group_format) elif reql_type == "GEOMETRY": # No special support for this. Just return the raw object return obj @@ -856,8 +847,8 @@ def convert_pseudotype(self, obj): return self.convert_binary(obj) elif binary_format != "raw": raise ReqlDriverError( - 'Unknown binary_format run option "%s".' % binary_format - ) + 'Unknown binary_format run option "%s".' % + binary_format) else: raise ReqlDriverError("Unknown pseudo-type %s" % reql_type) # If there was no pseudotype, or the relevant format is raw, return @@ -911,10 +902,10 @@ def build(self): def compose(self, args, optargs): return T( "r.expr({", - T( - *[T(repr(key), ": ", value) for key, value in dict_items(optargs)], - intsp=", " - ), + T(*[ + T(repr(key), ": ", value) for key, value in dict_items(optargs) + ], + intsp=", "), "})", ) @@ -1236,13 +1227,16 @@ class FunCall(RqlQuery): # before passing it down to the base class constructor. def __init__(self, *args): if len(args) == 0: - raise ReqlDriverCompileError("Expected 1 or more arguments but found 0.") + raise ReqlDriverCompileError( + "Expected 1 or more arguments but found 0.") args = [func_wrap(args[-1])] + list(args[:-1]) RqlQuery.__init__(self, *args) def compose(self, args, optargs): if len(args) != 2: - return T("r.do(", T(T(*(args[1:]), intsp=", "), args[0], intsp=", "), ")") + return T("r.do(", T(T(*(args[1:]), intsp=", "), + args[0], + intsp=", "), ")") if isinstance(self._args[1], Datum): args[1] = T("r.expr(", args[1], ")") @@ -1712,12 +1706,10 @@ def __new__(cls, *args, **kwargs): def __repr__(self): excerpt = binascii.hexlify(self[0:6]).decode("utf-8") - excerpt = " ".join([excerpt[i : i + 2] for i in xrange(0, len(excerpt), 2)]) - excerpt = ( - ", '%s%s'" % (excerpt, "..." if len(self) > 6 else "") - if len(self) > 0 - else "" - ) + excerpt = " ".join( + [excerpt[i:i + 2] for i in xrange(0, len(excerpt), 2)]) + excerpt = (", '%s%s'" % (excerpt, "..." if len(self) > 6 else "") + if len(self) > 0 else "") return "" % ( len(self), "s" if len(self) != 1 else "", @@ -1741,16 +1733,11 @@ def __init__(self, data): raise ReqlDriverCompileError( "Cannot convert a unicode string to binary, " "use `unicode.encode()` to specify the " - "encoding." - ) + "encoding.") elif not isinstance(data, bytes): raise ReqlDriverCompileError( - ( - "Cannot convert %s to binary, convert the " - "object to a `bytes` object first." - ) - % type(data).__name__ - ) + ("Cannot convert %s to binary, convert the " + "object to a `bytes` object first.") % type(data).__name__) else: self.base64_data = base64.b64encode(data) @@ -1766,7 +1753,10 @@ def compose(self, args, optargs): def build(self): if len(self._args) == 0: - return {"$reql_type$": "BINARY", "data": self.base64_data.decode("utf-8")} + return { + "$reql_type$": "BINARY", + "data": self.base64_data.decode("utf-8") + } else: return RqlTopLevelQuery.build(self) @@ -1974,10 +1964,11 @@ def __init__(self, lmbd): def compose(self, args, optargs): return T( "lambda ", - T( - *[v.compose([v._args[0].compose(None, None)], []) for v in self.vrs], - intsp=", " - ), + T(*[ + v.compose([v._args[0].compose(None, None)], []) + for v in self.vrs + ], + intsp=", "), ": ", args[1], ) diff --git a/rethinkdb/asyncio_net/net_asyncio.py b/rethinkdb/asyncio_net/net_asyncio.py index 781081e5..7502bb63 100644 --- a/rethinkdb/asyncio_net/net_asyncio.py +++ b/rethinkdb/asyncio_net/net_asyncio.py @@ -31,28 +31,25 @@ ) from rethinkdb.net import Connection as ConnectionBase from rethinkdb.net import Cursor, Query, Response, maybe_profile +from tasktools.taskloop import TaskLoop +from networktools.colorprint import gprint, bprint, rprint __all__ = ["Connection"] - pResponse = ql2_pb2.Response.ResponseType pQuery = ql2_pb2.Query.QueryType -@asyncio.coroutine -def _read_until(streamreader, delimiter): +async def _read_until(streamreader, delimiter): """Naive implementation of reading until a delimiter""" - buffer = bytearray() - - while True: - c = yield from streamreader.read(1) - if c == b"": - break # EOF - buffer.append(c[0]) - if c == delimiter: - break - - return bytes(buffer) + try: + result = await streamreader.readuntil(delimiter) + return bytes(result) + except asyncio.IncompleteReadError as ie: + return bytes(b"") + except asyncio.LimitOverrunError as lo: + print("Amount of data exceeds the configured stream limit") + raise lo def reusable_waiter(loop, timeout): @@ -62,20 +59,20 @@ def reusable_waiter(loop, timeout): waiter = reusable_waiter(event_loop, 10.0) while some_condition: - yield from waiter(some_future) + await waiter(some_future) """ if timeout is not None: deadline = loop.time() + timeout else: deadline = None - @asyncio.coroutine - def wait(future): + async def wait(future): if deadline is not None: new_timeout = max(deadline - loop.time(), 0) else: new_timeout = None - return (yield from asyncio.wait_for(future, new_timeout, loop=loop)) + # loop parameter deprecated on py3.8 + return (await asyncio.wait_for(future, new_timeout)) return wait @@ -101,20 +98,18 @@ def __init__(self, *args, **kwargs): def __aiter__(self): return self - @asyncio.coroutine - def __anext__(self): + async def __anext__(self): try: - return (yield from self._get_next(None)) + return (await self._get_next(None)) except ReqlCursorEmpty: raise StopAsyncIteration - @asyncio.coroutine - def close(self): + async def close(self): if self.error is None: self.error = self._empty_error() if self.conn.is_open(): self.outstanding_requests += 1 - yield from self.conn._parent._stop(self) + await self.conn._parent._stop(self) def _extend(self, res_buf): Cursor._extend(self, res_buf) @@ -123,8 +118,7 @@ def _extend(self, res_buf): # Convenience function so users know when they've hit the end of the cursor # without having to catch an exception - @asyncio.coroutine - def fetch_next(self, wait=True): + async def fetch_next(self, wait=True): timeout = Cursor._wait_to_timeout(wait) waiter = reusable_waiter(self.conn._io_loop, timeout) while len(self.items) == 0 and self.error is None: @@ -132,33 +126,30 @@ def fetch_next(self, wait=True): if self.error is not None: raise self.error with translate_timeout_errors(): - yield from waiter(asyncio.shield(self.new_response)) + await waiter(asyncio.shield(self.new_response)) # If there is a (non-empty) error to be received, we return True, so the # user will receive it on the next `next` call. - return len(self.items) != 0 or not isinstance(self.error, RqlCursorEmpty) + return len(self.items) != 0 or not isinstance(self.error, + RqlCursorEmpty) def _empty_error(self): # We do not have RqlCursorEmpty inherit from StopIteration as that interferes # with mechanisms to return from a coroutine. return RqlCursorEmpty() - @asyncio.coroutine - def _get_next(self, timeout): + async def _get_next(self, timeout): waiter = reusable_waiter(self.conn._io_loop, timeout) while len(self.items) == 0: self._maybe_fetch_batch() if self.error is not None: raise self.error with translate_timeout_errors(): - yield from waiter(asyncio.shield(self.new_response)) + await waiter(asyncio.shield(self.new_response)) return self.items.popleft() def _maybe_fetch_batch(self): - if ( - self.error is None - and len(self.items) < self.threshold - and self.outstanding_requests == 0 - ): + if (self.error is None and len(self.items) < self.threshold + and self.outstanding_requests == 0): self.outstanding_requests += 1 asyncio.ensure_future(self.conn._parent._continue(self)) @@ -177,6 +168,15 @@ def __init__(self, parent, io_loop=None): self._io_loop = io_loop if self._io_loop is None: self._io_loop = asyncio.get_event_loop() + asyncio.set_event_loop(self._io_loop) + + @property + def writer(self): + return self._streamwriter + + @property + def reader(self): + return self._streamreader def client_port(self): if self.is_open(): @@ -186,8 +186,11 @@ def client_address(self): if self.is_open(): return self._streamwriter.get_extra_info("sockname")[0] - @asyncio.coroutine - def connect(self, timeout): + async def read_task(self): + task = TaskLoop(self._reader, [], {}, name='reader_rdb') + task.create() + + async def connect(self, timeout): try: ssl_context = None if len(self._parent.ssl) > 0: @@ -199,23 +202,20 @@ def connect(self, timeout): ssl_context.check_hostname = True # redundant with match_hostname ssl_context.load_verify_locations(self._parent.ssl["ca_certs"]) - self._streamreader, self._streamwriter = yield from asyncio.open_connection( + self._streamreader, self._streamwriter = await asyncio.open_connection( self._parent.host, self._parent.port, loop=self._io_loop, ssl=ssl_context, ) self._streamwriter.get_extra_info("socket").setsockopt( - socket.IPPROTO_TCP, socket.TCP_NODELAY, 1 - ) + socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) self._streamwriter.get_extra_info("socket").setsockopt( - socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1 - ) + socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) except Exception as err: raise ReqlDriverError( - "Could not connect to %s:%s. Error: %s" - % (self._parent.host, self._parent.port, str(err)) - ) + "Could not connect to %s:%s. Error: %s" % + (self._parent.host, self._parent.port, str(err))) try: self._parent.handshake.reset() @@ -227,41 +227,39 @@ def connect(self, timeout): break # This may happen in the `V1_0` protocol where we send two requests as # an optimization, then need to read each separately - if request is not "": + if request != "": self._streamwriter.write(request) - response = yield from asyncio.wait_for( - _read_until(self._streamreader, b"\0"), - timeout, - loop=self._io_loop, - ) + response = await asyncio.wait_for( + _read_until(self._streamreader, b"\0"), timeout) response = response[:-1] except ReqlAuthError: - yield from self.close() + await self.close() raise except ReqlTimeoutError as err: - yield from self.close() + await self.close() raise ReqlDriverError( "Connection interrupted during handshake with %s:%s. Error: %s" - % (self._parent.host, self._parent.port, str(err)) - ) + % (self._parent.host, self._parent.port, str(err))) except Exception as err: - yield from self.close() + await self.close() raise ReqlDriverError( - "Could not connect to %s:%s. Error: %s" - % (self._parent.host, self._parent.port, str(err)) - ) + "Could not connect to %s:%s. Error: %s" % + (self._parent.host, self._parent.port, str(err))) # Start a parallel function to perform reads # store a reference to it so it doesn't get destroyed - self._reader_task = asyncio.ensure_future(self._reader(), loop=self._io_loop) + + self._reader_task = asyncio.run_coroutine_threadsafe( + self.read_task(), self._io_loop) + # self._reader_task = asyncio.ensure_future(self._reader(), + # loop=self._io_loop) return self._parent def is_open(self): return not (self._closing or self._streamreader.at_eof()) - @asyncio.coroutine - def close(self, noreply_wait=False, token=None, exception=None): + async def close(self, noreply_wait=False, token=None, exception=None): self._closing = True if exception is not None: err_message = "Connection is closed (%s)." % str(exception) @@ -281,67 +279,79 @@ def close(self, noreply_wait=False, token=None, exception=None): if noreply_wait: noreply = Query(pQuery.NOREPLY_WAIT, token, None, None) - yield from self.run_query(noreply, False) + await self.run_query(noreply, False) self._streamwriter.close() + await self._streamwriter.wait_closed() # We must not wait for the _reader_task if we got an exception, because that # means that we were called from it. Waiting would lead to a deadlock. if self._reader_task and exception is None: - yield from self._reader_task + await self._reader_task return None - @asyncio.coroutine - def run_query(self, query, noreply): - self._streamwriter.write(query.serialize(self._parent._get_json_encoder(query))) + async def run_query(self, query, noreply): + serialized_query = query.serialize( + self._parent._get_json_encoder(query)) + gprint("_" * 30) + bprint(f"query {serialized_query}") + gprint("_" * 30) + self._streamwriter.write(serialized_query) + await self._streamwriter.drain() if noreply: return None - - response_future = asyncio.Future() + response_future = self._io_loop.create_future() self._user_queries[query.token] = (query, response_future) - return (yield from response_future) + result = await response_future + return result # The _reader coroutine runs in parallel, reading responses # off of the socket and forwarding them to the appropriate Future or Cursor. # This is shut down as a consequence of closing the stream, or an error in the # socket/protocol from the server. Unexpected errors in this coroutine will # close the ConnectionInstance and be passed to any open Futures or Cursors. - @asyncio.coroutine - def _reader(self): + async def _reader(self, *args, **kwargs): + # now the loop is on the taskloop try: - while True: - buf = yield from self._streamreader.readexactly(12) - (token, length,) = struct.unpack(" 10 else []) - ) + [self.items[x] for x in range(min(10, len(self.items)))] + + (["..."] if len(self.items) > 10 else [])) if val_str.endswith("'...']"): - val_str = val_str[: -len("'...']")] + "...]" + val_str = val_str[:-len("'...']")] + "...]" spacer_str = "\n" if "\n" in val_str else "" if self.error is None: status_str = "streaming" @@ -271,11 +267,10 @@ def __str__(self): def __repr__(self): val_str = pprint.pformat( - [self.items[x] for x in range(min(10, len(self.items)))] - + (["..."] if len(self.items) > 10 else []) - ) + [self.items[x] for x in range(min(10, len(self.items)))] + + (["..."] if len(self.items) > 10 else [])) if val_str.endswith("'...']"): - val_str = val_str[: -len("'...']")] + "...]" + val_str = val_str[:-len("'...']")] + "...]" spacer_str = "\n" if "\n" in val_str else "" if self.error is None: status_str = "streaming" @@ -301,11 +296,8 @@ def _error(self, message): self._extend(dummy_response) def _maybe_fetch_batch(self): - if ( - self.error is None - and len(self.items) < self.threshold - and self.outstanding_requests == 0 - ): + if (self.error is None and len(self.items) < self.threshold + and self.outstanding_requests == 0): self.outstanding_requests += 1 self.conn._parent._continue(self) @@ -346,27 +338,28 @@ def __init__(self, parent, timeout): deadline = time.time() + timeout try: - self._socket = socket.create_connection((self.host, self.port), timeout) + self._socket = socket.create_connection((self.host, self.port), + timeout) self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) if len(self.ssl) > 0: try: - if hasattr( - ssl, "SSLContext" - ): # Python2.7 and 3.2+, or backports.ssl + if hasattr(ssl, "SSLContext" + ): # Python2.7 and 3.2+, or backports.ssl ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) if hasattr(ssl_context, "options"): - ssl_context.options |= getattr(ssl, "OP_NO_SSLv2", 0) - ssl_context.options |= getattr(ssl, "OP_NO_SSLv3", 0) + ssl_context.options |= getattr( + ssl, "OP_NO_SSLv2", 0) + ssl_context.options |= getattr( + ssl, "OP_NO_SSLv3", 0) ssl_context.verify_mode = ssl.CERT_REQUIRED ssl_context.check_hostname = ( True # redundant with match_hostname ) ssl_context.load_verify_locations(self.ssl["ca_certs"]) self._socket = ssl_context.wrap_socket( - self._socket, server_hostname=self.host - ) + self._socket, server_hostname=self.host) else: # this does not disable SSLv2 or SSLv3 self._socket = ssl.wrap_socket( self._socket, @@ -378,8 +371,8 @@ def __init__(self, parent, timeout): self._socket.close() if "EOF occurred in violation of protocol" in str( - err - ) or "sslv3 alert handshake failure" in str(err): + err) or "sslv3 alert handshake failure" in str( + err): # probably on an older version of OpenSSL raise ReqlDriverError( "SSL handshake failed, likely because Python is linked against an old version of OpenSSL " @@ -387,15 +380,14 @@ def __init__(self, parent, timeout): "around by lowering the security setting on the server with the options " "`--tls-min-protocol TLSv1 --tls-ciphers " "EECDH+AESGCM:EDH+AESGCM:AES256+EECDH:AES256+EDH:AES256-SHA` (see server log for more " - "information): %s" % str(err) - ) + "information): %s" % str(err)) else: raise ReqlDriverError( "SSL handshake failed (see server log for more information): %s" - % str(err) - ) + % str(err)) try: - match_hostname(self._socket.getpeercert(), hostname=self.host) + match_hostname(self._socket.getpeercert(), + hostname=self.host) except CertificateError: self._socket.close() raise @@ -408,7 +400,7 @@ def __init__(self, parent, timeout): break # This may happen in the `V1_0` protocol where we send two requests as # an optimization, then need to read each separately - if request is not "": + if request != "": self.sendall(request) # The response from the server is a null-terminated string @@ -423,21 +415,18 @@ def __init__(self, parent, timeout): raise except ReqlDriverError as ex: self.close() - error = ( - str(ex) - .replace("receiving from", "during handshake with") - .replace("sending to", "during handshake with") - ) + error = (str(ex).replace("receiving from", + "during handshake with").replace( + "sending to", + "during handshake with")) raise ReqlDriverError(error) except socket.timeout as ex: self.close() raise ReqlTimeoutError(self.host, self.port) except Exception as ex: self.close() - raise ReqlDriverError( - "Could not connect to %s:%s. Error: %s" - % (self.host, self.port, str(ex)) - ) + raise ReqlDriverError("Could not connect to %s:%s. Error: %s" % + (self.host, self.port, str(ex))) def is_open(self): return self._socket is not None @@ -476,16 +465,13 @@ def recvall(self, length, deadline): # This should only happen with a timeout of 0 raise ReqlTimeoutError(self.host, self.port) elif ex.errno != errno.EINTR: - raise ReqlDriverError( - ("Connection interrupted " + "receiving from %s:%s - %s") - % (self.host, self.port, str(ex)) - ) + raise ReqlDriverError(("Connection interrupted " + + "receiving from %s:%s - %s") % + (self.host, self.port, str(ex))) except Exception as ex: self.close() - raise ReqlDriverError( - "Error receiving from %s:%s - %s" - % (self.host, self.port, str(ex)) - ) + raise ReqlDriverError("Error receiving from %s:%s - %s" % + (self.host, self.port, str(ex))) if len(chunk) == 0: self.close() @@ -505,14 +491,12 @@ def sendall(self, data): elif ex.errno != errno.EINTR: self.close() raise ReqlDriverError( - ("Connection interrupted " + "sending to %s:%s - %s") - % (self.host, self.port, str(ex)) - ) + ("Connection interrupted " + "sending to %s:%s - %s") % + (self.host, self.port, str(ex))) except Exception as ex: self.close() - raise ReqlDriverError( - "Error sending to %s:%s - %s" % (self.host, self.port, str(ex)) - ) + raise ReqlDriverError("Error sending to %s:%s - %s" % + (self.host, self.port, str(ex))) except BaseException: self.close() raise @@ -558,7 +542,8 @@ def close(self, noreply_wait=False, token=None): self._header_in_progress = None def run_query(self, query, noreply): - self._socket.sendall(query.serialize(self._parent._get_json_encoder(query))) + self._socket.sendall( + query.serialize(self._parent._get_json_encoder(query))) if noreply: return None @@ -567,7 +552,8 @@ def run_query(self, query, noreply): if res.type == pResponse.SUCCESS_ATOM: return maybe_profile(res.data[0], res) - elif res.type in (pResponse.SUCCESS_PARTIAL, pResponse.SUCCESS_SEQUENCE): + elif res.type in (pResponse.SUCCESS_PARTIAL, + pResponse.SUCCESS_SEQUENCE): cursor = DefaultCursor(self, query, res) return maybe_profile(cursor, res) elif res.type == pResponse.WAIT_COMPLETE: @@ -587,8 +573,12 @@ def _read_response(self, query, deadline=None): # of this response. The next 4 bytes give the # expected length of this response. if self._header_in_progress is None: - self._header_in_progress = self._socket.recvall(12, deadline) - (res_token, res_len,) = struct.unpack(" Date: Wed, 19 Aug 2020 20:24:11 -0400 Subject: [PATCH 02/12] erased prints for test --- rethinkdb/asyncio_net/net_asyncio.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/rethinkdb/asyncio_net/net_asyncio.py b/rethinkdb/asyncio_net/net_asyncio.py index 7502bb63..81da1c51 100644 --- a/rethinkdb/asyncio_net/net_asyncio.py +++ b/rethinkdb/asyncio_net/net_asyncio.py @@ -32,7 +32,6 @@ from rethinkdb.net import Connection as ConnectionBase from rethinkdb.net import Cursor, Query, Response, maybe_profile from tasktools.taskloop import TaskLoop -from networktools.colorprint import gprint, bprint, rprint __all__ = ["Connection"] @@ -293,9 +292,6 @@ async def close(self, noreply_wait=False, token=None, exception=None): async def run_query(self, query, noreply): serialized_query = query.serialize( self._parent._get_json_encoder(query)) - gprint("_" * 30) - bprint(f"query {serialized_query}") - gprint("_" * 30) self._streamwriter.write(serialized_query) await self._streamwriter.drain() if noreply: From 249df5d1d15b27a17bec055981e42ba569c4b40b Mon Sep 17 00:00:00 2001 From: pineiden Date: Thu, 20 Aug 2020 13:04:34 -0400 Subject: [PATCH 03/12] Some adjustments for merge --- rethinkdb/ast.py | 23 ++++++++++++----------- rethinkdb/asyncio_net/net_asyncio.py | 20 ++++++++++++-------- rethinkdb/net.py | 4 ++-- 3 files changed, 26 insertions(+), 21 deletions(-) diff --git a/rethinkdb/ast.py b/rethinkdb/ast.py index 759f75d7..54405bea 100644 --- a/rethinkdb/ast.py +++ b/rethinkdb/ast.py @@ -152,7 +152,7 @@ def __repr__(self): # Compile this query to a json-serializable object def build(self): res = [self.term_type, self._args] - if len(self.optargs) > 0: + if self.optargs: res.append(self.optargs) return res @@ -643,8 +643,8 @@ def compose(self, args, optargs): ] if self.infix: - return T("(", T(*t_args, intsp=[" ", self.statement_infix, " "]), - ")") + t_args = T(*t_args, intsp=[" ", self.statement_infix, " "]) + return T("(", t_args, ")") else: return T("r.", self.statement, "(", T(*t_args, intsp=", "), ")") @@ -900,12 +900,13 @@ def build(self): return self.optargs def compose(self, args, optargs): + list_comp = [ + T(repr(key), ": ", value) for key, value in dict_items(optargs) + ] + t_value = T(*list_compt, intsp=", ") return T( "r.expr({", - T(*[ - T(repr(key), ": ", value) for key, value in dict_items(optargs) - ], - intsp=", "), + t_value, "})", ) @@ -1708,8 +1709,8 @@ def __repr__(self): excerpt = binascii.hexlify(self[0:6]).decode("utf-8") excerpt = " ".join( [excerpt[i:i + 2] for i in xrange(0, len(excerpt), 2)]) - excerpt = (", '%s%s'" % (excerpt, "..." if len(self) > 6 else "") - if len(self) > 0 else "") + excerpt = (", '%s%s'" % + (excerpt, "..." if len(self) > 6 else "") if self else "") return "" % ( len(self), "s" if len(self) != 1 else "", @@ -1746,13 +1747,13 @@ def __init__(self, data): self.optargs = {} def compose(self, args, optargs): - if len(self._args) == 0: + if self._args: return T("r.", self.statement, "(bytes())") else: return RqlTopLevelQuery.compose(self, args, optargs) def build(self): - if len(self._args) == 0: + if self._args: return { "$reql_type$": "BINARY", "data": self.base64_data.decode("utf-8") diff --git a/rethinkdb/asyncio_net/net_asyncio.py b/rethinkdb/asyncio_net/net_asyncio.py index 81da1c51..5844015f 100644 --- a/rethinkdb/asyncio_net/net_asyncio.py +++ b/rethinkdb/asyncio_net/net_asyncio.py @@ -44,11 +44,11 @@ async def _read_until(streamreader, delimiter): try: result = await streamreader.readuntil(delimiter) return bytes(result) - except asyncio.IncompleteReadError as ie: - return bytes(b"") - except asyncio.LimitOverrunError as lo: + except asyncio.IncompleteReadError as ie_error: + return bytes(ie_error.partial) + except asyncio.LimitOverrunError as lo_error: print("Amount of data exceeds the configured stream limit") - raise lo + raise lo_error def reusable_waiter(loop, timeout): @@ -71,7 +71,8 @@ async def wait(future): else: new_timeout = None # loop parameter deprecated on py3.8 - return (await asyncio.wait_for(future, new_timeout)) + result = await asyncio.wait_for(future, new_timeout) + return result return wait @@ -368,16 +369,19 @@ async def __aexit__(self, exception_type, exception_val, traceback): async def _stop(self, cursor): self.check_open() q = Query(pQuery.STOP, cursor.query.token, None, None) - return (await self._instance.run_query(q, True)) + result = await self._instance.run_query(q, True) + return result async def reconnect(self, noreply_wait=True, timeout=None): # We close before reconnect so reconnect doesn't try to close us # and then fail to return the Future (this is a little awkward). await self.close(noreply_wait) self._instance = self._conn_type(self, **self._child_kwargs) - return (await self._instance.connect(timeout)) + result = await self._instance.connect(timeout) + return result async def close(self, noreply_wait=True): if self._instance is None: return None - return (await self._instance.close()) + result = await self._instance.close() + return result diff --git a/rethinkdb/net.py b/rethinkdb/net.py index a8427d25..f0542814 100644 --- a/rethinkdb/net.py +++ b/rethinkdb/net.py @@ -345,8 +345,8 @@ def __init__(self, parent, timeout): if len(self.ssl) > 0: try: - if hasattr(ssl, "SSLContext" - ): # Python2.7 and 3.2+, or backports.ssl + if hasattr(ssl, "SSLContext"): + # Python2.7 and 3.2+, or backports.ssl ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) if hasattr(ssl_context, "options"): ssl_context.options |= getattr( From 0c709ebd5b7a631dce8a571ef742723692304966 Mon Sep 17 00:00:00 2001 From: pineiden Date: Tue, 25 Aug 2020 12:54:58 -0400 Subject: [PATCH 04/12] RqlExceptionError sended by kwargs results on def _read, to be handled by tasktools --- rethinkdb/asyncio_net/net_asyncio.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/rethinkdb/asyncio_net/net_asyncio.py b/rethinkdb/asyncio_net/net_asyncio.py index 5844015f..445c9e35 100644 --- a/rethinkdb/asyncio_net/net_asyncio.py +++ b/rethinkdb/asyncio_net/net_asyncio.py @@ -309,6 +309,7 @@ async def run_query(self, query, noreply): # close the ConnectionInstance and be passed to any open Futures or Cursors. async def _reader(self, *args, **kwargs): # now the loop is on the taskloop + this_task = asyncio.current_task() try: nbytes = 12 buf = b"" @@ -344,8 +345,14 @@ async def _reader(self, *args, **kwargs): future.set_exception(res.make_error(query)) del self._user_queries[token] elif not self._closing: - raise ReqlDriverError("Unexpected response received.") + """ + This error must be handled by TaskLoop to preserve the loop + """ + kwargs["exception"] = ReqlDriverError( + "Unexpected response received.") except Exception as ex: + result = args, kwargs + this_task.set_result(result) if not self._closing: await self.close(exception=ex) return args, kwargs From 99aae643d46a7891306fd56a0f52f1edf0d9bfa3 Mon Sep 17 00:00:00 2001 From: pineiden Date: Wed, 26 Aug 2020 10:40:03 -0400 Subject: [PATCH 05/12] manage future excpetion, to wrap_future(future) --- rethinkdb/asyncio_net/net_asyncio.py | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/rethinkdb/asyncio_net/net_asyncio.py b/rethinkdb/asyncio_net/net_asyncio.py index 445c9e35..649ec252 100644 --- a/rethinkdb/asyncio_net/net_asyncio.py +++ b/rethinkdb/asyncio_net/net_asyncio.py @@ -286,21 +286,26 @@ async def close(self, noreply_wait=False, token=None, exception=None): # We must not wait for the _reader_task if we got an exception, because that # means that we were called from it. Waiting would lead to a deadlock. if self._reader_task and exception is None: - await self._reader_task + await asyncio.wrap_future(self._reader_task) return None async def run_query(self, query, noreply): - serialized_query = query.serialize( - self._parent._get_json_encoder(query)) - self._streamwriter.write(serialized_query) - await self._streamwriter.drain() - if noreply: - return None - response_future = self._io_loop.create_future() - self._user_queries[query.token] = (query, response_future) - result = await response_future - return result + try: + serialized_query = query.serialize( + self._parent._get_json_encoder(query)) + self._streamwriter.write(serialized_query) + await self._streamwriter.drain() + if noreply: + return None + response_future = self._io_loop.create_future() + self._user_queries[query.token] = (query, response_future) + result = await response_future + return result + except asyncio.CancelledError as c_error: + raise c_error + except Exception as e: + raise e # The _reader coroutine runs in parallel, reading responses # off of the socket and forwarding them to the appropriate Future or Cursor. From 2b720ea4b8e0250a91f877cfba3810a4a7217201 Mon Sep 17 00:00:00 2001 From: pineiden Date: Wed, 26 Aug 2020 11:30:35 -0400 Subject: [PATCH 06/12] Fixings for pull request --- rethinkdb/ast.py | 15 +++++++-------- rethinkdb/asyncio_net/net_asyncio.py | 7 ++++--- rethinkdb/net.py | 10 +++++----- 3 files changed, 16 insertions(+), 16 deletions(-) diff --git a/rethinkdb/ast.py b/rethinkdb/ast.py index 54405bea..b7e7ea73 100644 --- a/rethinkdb/ast.py +++ b/rethinkdb/ast.py @@ -629,6 +629,7 @@ def needs_wrap(arg): class RqlBoolOperQuery(RqlQuery): def __init__(self, *args, **optargs): + super().__init__(*args, **kwargs) self.infix = False RqlQuery.__init__(self, *args, **optargs) @@ -903,7 +904,7 @@ def compose(self, args, optargs): list_comp = [ T(repr(key), ": ", value) for key, value in dict_items(optargs) ] - t_value = T(*list_compt, intsp=", ") + t_value = T(*list_comp, intsp=", ") return T( "r.expr({", t_value, @@ -1749,8 +1750,7 @@ def __init__(self, data): def compose(self, args, optargs): if self._args: return T("r.", self.statement, "(bytes())") - else: - return RqlTopLevelQuery.compose(self, args, optargs) + return RqlTopLevelQuery.compose(self, args, optargs) def build(self): if self._args: @@ -1963,13 +1963,12 @@ def __init__(self, lmbd): self._args.extend([MakeArray(*vrids), expr(lmbd(*vrs))]) def compose(self, args, optargs): + list_comp = [ + v.compose([v._args[0].compose(None, None)], []) for v in self.vrs + ] return T( "lambda ", - T(*[ - v.compose([v._args[0].compose(None, None)], []) - for v in self.vrs - ], - intsp=", "), + T(*list_comp, intsp=", "), ": ", args[1], ) diff --git a/rethinkdb/asyncio_net/net_asyncio.py b/rethinkdb/asyncio_net/net_asyncio.py index 649ec252..09de6bde 100644 --- a/rethinkdb/asyncio_net/net_asyncio.py +++ b/rethinkdb/asyncio_net/net_asyncio.py @@ -100,7 +100,8 @@ def __aiter__(self): async def __anext__(self): try: - return (await self._get_next(None)) + result = await self._get_next(None) + return result except ReqlCursorEmpty: raise StopAsyncIteration @@ -304,8 +305,8 @@ async def run_query(self, query, noreply): return result except asyncio.CancelledError as c_error: raise c_error - except Exception as e: - raise e + except Exception as error: + raise error # The _reader coroutine runs in parallel, reading responses # off of the socket and forwarding them to the appropriate Future or Cursor. diff --git a/rethinkdb/net.py b/rethinkdb/net.py index f0542814..3ac1e20c 100644 --- a/rethinkdb/net.py +++ b/rethinkdb/net.py @@ -610,7 +610,7 @@ class Connection(object): _json_encoder = ReQLEncoder def __init__(self, conn_type, host, port, db, auth_key, user, password, - timeout, ssl, _handshake_version, **kwargs): + timeout, _ssl, _handshake_version, **kwargs): self.db = db self.host = host @@ -622,7 +622,7 @@ def __init__(self, conn_type, host, port, db, auth_key, user, password, self.connect_timeout = timeout - self.ssl = ssl + self.ssl = _ssl self._conn_type = conn_type self._child_kwargs = kwargs @@ -753,7 +753,7 @@ def __init__(self, *args, **kwargs): def make_connection(connection_type, host=None, port=None, - db=None, + db_name=None, auth_key=None, user=None, password=None, @@ -793,6 +793,6 @@ def make_connection(connection_type, if not password and not password is None: password = None - conn = connection_type(host, port, db, auth_key, user, password, timeout, - ssl, _handshake_version, **kwargs) + conn = connection_type(host, port, db_name, auth_key, user, password, + timeout, ssl, _handshake_version, **kwargs) return conn.reconnect(timeout=timeout) From 2ca23049a6a0d1cb8980fe1740c92e692331b356 Mon Sep 17 00:00:00 2001 From: pineiden Date: Wed, 26 Aug 2020 12:02:18 -0400 Subject: [PATCH 07/12] extra fixes... kwargs->optargs --- rethinkdb/ast.py | 8 ++++---- rethinkdb/net.py | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/rethinkdb/ast.py b/rethinkdb/ast.py index b7e7ea73..7a1b1fdc 100644 --- a/rethinkdb/ast.py +++ b/rethinkdb/ast.py @@ -628,10 +628,11 @@ def needs_wrap(arg): class RqlBoolOperQuery(RqlQuery): + statement_infix = None + def __init__(self, *args, **optargs): - super().__init__(*args, **kwargs) + super().__init__(*args, **optargs) self.infix = False - RqlQuery.__init__(self, *args, **optargs) def set_infix(self): self.infix = True @@ -1758,8 +1759,7 @@ def build(self): "$reql_type$": "BINARY", "data": self.base64_data.decode("utf-8") } - else: - return RqlTopLevelQuery.build(self) + return RqlTopLevelQuery.build(self) class Range(RqlTopLevelQuery): diff --git a/rethinkdb/net.py b/rethinkdb/net.py index 3ac1e20c..0c9b9a1e 100644 --- a/rethinkdb/net.py +++ b/rethinkdb/net.py @@ -609,9 +609,9 @@ class Connection(object): _json_decoder = ReQLDecoder _json_encoder = ReQLEncoder - def __init__(self, conn_type, host, port, db, auth_key, user, password, - timeout, _ssl, _handshake_version, **kwargs): - self.db = db + def __init__(self, conn_type, host, port, db_name, auth_key, user, + password, timeout, _ssl, _handshake_version, **kwargs): + self.db = db_name self.host = host try: @@ -771,7 +771,7 @@ def make_connection(connection_type, host = connection_string.hostname port = connection_string.port - db = connection_string.path.replace("/", "") or None + db_name = connection_string.path.replace("/", "") or None auth_key = query_string.get("auth_key") timeout = query_string.get("timeout") From 2e0e9a23f58a14a750dae2ae652e4c1e587dcf5f Mon Sep 17 00:00:00 2001 From: pineiden Date: Wed, 26 Aug 2020 17:24:58 -0400 Subject: [PATCH 08/12] self.db -> self.db_name on net.py --- rethinkdb/net.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/rethinkdb/net.py b/rethinkdb/net.py index 0c9b9a1e..61c8b41e 100644 --- a/rethinkdb/net.py +++ b/rethinkdb/net.py @@ -611,7 +611,7 @@ class Connection(object): def __init__(self, conn_type, host, port, db_name, auth_key, user, password, timeout, _ssl, _handshake_version, **kwargs): - self.db = db_name + self.db_name = db_name self.host = host try: @@ -687,7 +687,7 @@ def __exit__(self, type, value, traceback): self.close(noreply_wait=False) def use(self, db): - self.db = db + self.db_name = db def is_open(self): return self._instance is not None and self._instance.is_open() @@ -721,8 +721,8 @@ def _new_token(self): def _start(self, term, **global_optargs): self.check_open() - if "db" in global_optargs or self.db is not None: - global_optargs["db"] = DB(global_optargs.get("db", self.db)) + if "db" in global_optargs or self.db_name is not None: + global_optargs["db"] = DB(global_optargs.get("db", self.db_name)) q = Query(pQuery.START, self._new_token(), term, global_optargs) return self._instance.run_query(q, global_optargs.get("noreply", False)) From ee122e0f1d149c925e86222d07f20c8089de64a4 Mon Sep 17 00:00:00 2001 From: pineiden Date: Tue, 1 Sep 2020 13:21:57 -0400 Subject: [PATCH 09/12] Task cannot do a 'set_result' method (only futures), removed --- rethinkdb/asyncio_net/net_asyncio.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/rethinkdb/asyncio_net/net_asyncio.py b/rethinkdb/asyncio_net/net_asyncio.py index 09de6bde..ec027fa8 100644 --- a/rethinkdb/asyncio_net/net_asyncio.py +++ b/rethinkdb/asyncio_net/net_asyncio.py @@ -357,8 +357,7 @@ async def _reader(self, *args, **kwargs): kwargs["exception"] = ReqlDriverError( "Unexpected response received.") except Exception as ex: - result = args, kwargs - this_task.set_result(result) + print(f"Exception on _reader, {ex}") if not self._closing: await self.close(exception=ex) return args, kwargs From 71a3dc4963844a92904db3e4b35cf16206a6246b Mon Sep 17 00:00:00 2001 From: pineiden Date: Thu, 3 Sep 2020 12:31:09 -0400 Subject: [PATCH 10/12] On _reader must 'raise Exception' if exists, with that the caller can cancel or close the connection. --- rethinkdb/asyncio_net/net_asyncio.py | 1 + 1 file changed, 1 insertion(+) diff --git a/rethinkdb/asyncio_net/net_asyncio.py b/rethinkdb/asyncio_net/net_asyncio.py index ec027fa8..453b7390 100644 --- a/rethinkdb/asyncio_net/net_asyncio.py +++ b/rethinkdb/asyncio_net/net_asyncio.py @@ -360,6 +360,7 @@ async def _reader(self, *args, **kwargs): print(f"Exception on _reader, {ex}") if not self._closing: await self.close(exception=ex) + raise ex return args, kwargs From 062b2e7f78476aab8192beee4b3d81ba11eef073 Mon Sep 17 00:00:00 2001 From: pineiden Date: Sun, 6 Sep 2020 16:22:54 -0300 Subject: [PATCH 11/12] Added exception incomplete read on _read coroutine --- rethinkdb/asyncio_net/net_asyncio.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/rethinkdb/asyncio_net/net_asyncio.py b/rethinkdb/asyncio_net/net_asyncio.py index 453b7390..7c67849e 100644 --- a/rethinkdb/asyncio_net/net_asyncio.py +++ b/rethinkdb/asyncio_net/net_asyncio.py @@ -356,6 +356,11 @@ async def _reader(self, *args, **kwargs): """ kwargs["exception"] = ReqlDriverError( "Unexpected response received.") + except asyncio.IncompleteReadError as read_error: + print(f"Exception on _reader, incomplete_read, {read_error}") + if not self._closing: + await self.close(exception=ex) + raise read_error except Exception as ex: print(f"Exception on _reader, {ex}") if not self._closing: From 46c53a8f0ca2dd6b07654ed6721e0d6d3540ca20 Mon Sep 17 00:00:00 2001 From: pineiden Date: Wed, 18 Nov 2020 13:46:05 -0300 Subject: [PATCH 12/12] corrections for pass pr --- .bandit | 0 .coveragerc | 0 .editorconfig | 0 .github/FUNDING.yml | 0 .github/ISSUE_TEMPLATE/bug_report.md | 0 .github/ISSUE_TEMPLATE/feature_request.md | 0 .github/PULL_REQUEST_TEMPLATE.md | 0 .gitignore | 0 .travis.yml | 0 CODE_OF_CONDUCT.md | 0 CONTRIBUTING.md | 0 LICENSE | 0 MANIFEST.in | 0 Makefile | 0 README.md | 0 pytest.ini | 0 requirements.txt | 0 rethinkdb/__init__.py | 0 rethinkdb/__main__.py | 0 rethinkdb/ast.py | 0 rethinkdb/asyncio_net/__init__.py | 0 rethinkdb/asyncio_net/net_asyncio.py | 14 +++++++------- rethinkdb/backports/__init__.py | 0 rethinkdb/backports/ssl_match_hostname/LICENSE.txt | 0 rethinkdb/backports/ssl_match_hostname/README.txt | 0 rethinkdb/backports/ssl_match_hostname/__init__.py | 0 rethinkdb/docs.py | 0 rethinkdb/errors.py | 0 rethinkdb/gevent_net/__init__.py | 0 rethinkdb/gevent_net/net_gevent.py | 0 rethinkdb/handshake.py | 0 rethinkdb/helpers.py | 0 rethinkdb/logger.py | 0 rethinkdb/net.py | 0 rethinkdb/query.py | 0 rethinkdb/tornado_net/__init__.py | 0 rethinkdb/tornado_net/net_tornado.py | 0 rethinkdb/trio_net/__init__.py | 0 rethinkdb/trio_net/net_trio.py | 0 rethinkdb/twisted_net/__init__.py | 0 rethinkdb/twisted_net/net_twisted.py | 0 rethinkdb/utils_common.py | 0 rethinkdb/version.py | 0 scripts/convert_protofile.py | 0 scripts/prepare_remote_test.py | 0 scripts/upload-coverage.sh | 0 scripts/upload-pypi.sh | 0 setup.cfg | 0 setup.py | 0 tests/__init__.py | 0 tests/conftest.py | 0 tests/helpers.py | 0 tests/integration/__init__.py | 0 tests/integration/test_asyncio.py | 0 tests/integration/test_connect.py | 0 tests/integration/test_cursor.py | 0 tests/integration/test_data_write.py | 0 tests/integration/test_database.py | 0 tests/integration/test_date_and_time.py | 0 tests/integration/test_index.py | 0 tests/integration/test_ping.py | 0 tests/integration/test_repl.py | 0 tests/integration/test_table.py | 0 tests/integration/test_tornado.py | 0 tests/integration/test_trio.py | 0 tests/integration/test_write_hooks.py | 0 tests/test_date_and_time.py | 0 tests/test_handshake.py | 0 tests/test_helpers.py | 0 tests/test_logger.py | 0 tests/test_net.py | 0 tests/test_utils_common.py | 0 72 files changed, 7 insertions(+), 7 deletions(-) mode change 100644 => 100755 .bandit mode change 100644 => 100755 .coveragerc mode change 100644 => 100755 .editorconfig mode change 100644 => 100755 .github/FUNDING.yml mode change 100644 => 100755 .github/ISSUE_TEMPLATE/bug_report.md mode change 100644 => 100755 .github/ISSUE_TEMPLATE/feature_request.md mode change 100644 => 100755 .github/PULL_REQUEST_TEMPLATE.md mode change 100644 => 100755 .gitignore mode change 100644 => 100755 .travis.yml mode change 100644 => 100755 CODE_OF_CONDUCT.md mode change 100644 => 100755 CONTRIBUTING.md mode change 100644 => 100755 LICENSE mode change 100644 => 100755 MANIFEST.in mode change 100644 => 100755 Makefile mode change 100644 => 100755 README.md mode change 100644 => 100755 pytest.ini mode change 100644 => 100755 requirements.txt mode change 100644 => 100755 rethinkdb/__init__.py mode change 100644 => 100755 rethinkdb/__main__.py mode change 100644 => 100755 rethinkdb/ast.py mode change 100644 => 100755 rethinkdb/asyncio_net/__init__.py mode change 100644 => 100755 rethinkdb/asyncio_net/net_asyncio.py mode change 100644 => 100755 rethinkdb/backports/__init__.py mode change 100644 => 100755 rethinkdb/backports/ssl_match_hostname/LICENSE.txt mode change 100644 => 100755 rethinkdb/backports/ssl_match_hostname/README.txt mode change 100644 => 100755 rethinkdb/backports/ssl_match_hostname/__init__.py mode change 100644 => 100755 rethinkdb/docs.py mode change 100644 => 100755 rethinkdb/errors.py mode change 100644 => 100755 rethinkdb/gevent_net/__init__.py mode change 100644 => 100755 rethinkdb/gevent_net/net_gevent.py mode change 100644 => 100755 rethinkdb/handshake.py mode change 100644 => 100755 rethinkdb/helpers.py mode change 100644 => 100755 rethinkdb/logger.py mode change 100644 => 100755 rethinkdb/net.py mode change 100644 => 100755 rethinkdb/query.py mode change 100644 => 100755 rethinkdb/tornado_net/__init__.py mode change 100644 => 100755 rethinkdb/tornado_net/net_tornado.py mode change 100644 => 100755 rethinkdb/trio_net/__init__.py mode change 100644 => 100755 rethinkdb/trio_net/net_trio.py mode change 100644 => 100755 rethinkdb/twisted_net/__init__.py mode change 100644 => 100755 rethinkdb/twisted_net/net_twisted.py mode change 100644 => 100755 rethinkdb/utils_common.py mode change 100644 => 100755 rethinkdb/version.py mode change 100644 => 100755 scripts/convert_protofile.py mode change 100644 => 100755 scripts/prepare_remote_test.py mode change 100644 => 100755 scripts/upload-coverage.sh mode change 100644 => 100755 scripts/upload-pypi.sh mode change 100644 => 100755 setup.cfg mode change 100644 => 100755 setup.py mode change 100644 => 100755 tests/__init__.py mode change 100644 => 100755 tests/conftest.py mode change 100644 => 100755 tests/helpers.py mode change 100644 => 100755 tests/integration/__init__.py mode change 100644 => 100755 tests/integration/test_asyncio.py mode change 100644 => 100755 tests/integration/test_connect.py mode change 100644 => 100755 tests/integration/test_cursor.py mode change 100644 => 100755 tests/integration/test_data_write.py mode change 100644 => 100755 tests/integration/test_database.py mode change 100644 => 100755 tests/integration/test_date_and_time.py mode change 100644 => 100755 tests/integration/test_index.py mode change 100644 => 100755 tests/integration/test_ping.py mode change 100644 => 100755 tests/integration/test_repl.py mode change 100644 => 100755 tests/integration/test_table.py mode change 100644 => 100755 tests/integration/test_tornado.py mode change 100644 => 100755 tests/integration/test_trio.py mode change 100644 => 100755 tests/integration/test_write_hooks.py mode change 100644 => 100755 tests/test_date_and_time.py mode change 100644 => 100755 tests/test_handshake.py mode change 100644 => 100755 tests/test_helpers.py mode change 100644 => 100755 tests/test_logger.py mode change 100644 => 100755 tests/test_net.py mode change 100644 => 100755 tests/test_utils_common.py diff --git a/.bandit b/.bandit old mode 100644 new mode 100755 diff --git a/.coveragerc b/.coveragerc old mode 100644 new mode 100755 diff --git a/.editorconfig b/.editorconfig old mode 100644 new mode 100755 diff --git a/.github/FUNDING.yml b/.github/FUNDING.yml old mode 100644 new mode 100755 diff --git a/.github/ISSUE_TEMPLATE/bug_report.md b/.github/ISSUE_TEMPLATE/bug_report.md old mode 100644 new mode 100755 diff --git a/.github/ISSUE_TEMPLATE/feature_request.md b/.github/ISSUE_TEMPLATE/feature_request.md old mode 100644 new mode 100755 diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md old mode 100644 new mode 100755 diff --git a/.gitignore b/.gitignore old mode 100644 new mode 100755 diff --git a/.travis.yml b/.travis.yml old mode 100644 new mode 100755 diff --git a/CODE_OF_CONDUCT.md b/CODE_OF_CONDUCT.md old mode 100644 new mode 100755 diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md old mode 100644 new mode 100755 diff --git a/LICENSE b/LICENSE old mode 100644 new mode 100755 diff --git a/MANIFEST.in b/MANIFEST.in old mode 100644 new mode 100755 diff --git a/Makefile b/Makefile old mode 100644 new mode 100755 diff --git a/README.md b/README.md old mode 100644 new mode 100755 diff --git a/pytest.ini b/pytest.ini old mode 100644 new mode 100755 diff --git a/requirements.txt b/requirements.txt old mode 100644 new mode 100755 diff --git a/rethinkdb/__init__.py b/rethinkdb/__init__.py old mode 100644 new mode 100755 diff --git a/rethinkdb/__main__.py b/rethinkdb/__main__.py old mode 100644 new mode 100755 diff --git a/rethinkdb/ast.py b/rethinkdb/ast.py old mode 100644 new mode 100755 diff --git a/rethinkdb/asyncio_net/__init__.py b/rethinkdb/asyncio_net/__init__.py old mode 100644 new mode 100755 diff --git a/rethinkdb/asyncio_net/net_asyncio.py b/rethinkdb/asyncio_net/net_asyncio.py old mode 100644 new mode 100755 index 7c67849e..28ff8e8e --- a/rethinkdb/asyncio_net/net_asyncio.py +++ b/rethinkdb/asyncio_net/net_asyncio.py @@ -315,7 +315,6 @@ async def run_query(self, query, noreply): # close the ConnectionInstance and be passed to any open Futures or Cursors. async def _reader(self, *args, **kwargs): # now the loop is on the taskloop - this_task = asyncio.current_task() try: nbytes = 12 buf = b"" @@ -351,18 +350,13 @@ async def _reader(self, *args, **kwargs): future.set_exception(res.make_error(query)) del self._user_queries[token] elif not self._closing: - """ - This error must be handled by TaskLoop to preserve the loop - """ kwargs["exception"] = ReqlDriverError( "Unexpected response received.") except asyncio.IncompleteReadError as read_error: - print(f"Exception on _reader, incomplete_read, {read_error}") if not self._closing: - await self.close(exception=ex) + await self.close(exception=read_error) raise read_error except Exception as ex: - print(f"Exception on _reader, {ex}") if not self._closing: await self.close(exception=ex) raise ex @@ -379,9 +373,15 @@ def __init__(self, *args, **kwargs): self.port) async def __aenter__(self): + """ + async context manager enter + """ return self async def __aexit__(self, exception_type, exception_val, traceback): + """ + async context manager exit + """ await self.close(False) async def _stop(self, cursor): diff --git a/rethinkdb/backports/__init__.py b/rethinkdb/backports/__init__.py old mode 100644 new mode 100755 diff --git a/rethinkdb/backports/ssl_match_hostname/LICENSE.txt b/rethinkdb/backports/ssl_match_hostname/LICENSE.txt old mode 100644 new mode 100755 diff --git a/rethinkdb/backports/ssl_match_hostname/README.txt b/rethinkdb/backports/ssl_match_hostname/README.txt old mode 100644 new mode 100755 diff --git a/rethinkdb/backports/ssl_match_hostname/__init__.py b/rethinkdb/backports/ssl_match_hostname/__init__.py old mode 100644 new mode 100755 diff --git a/rethinkdb/docs.py b/rethinkdb/docs.py old mode 100644 new mode 100755 diff --git a/rethinkdb/errors.py b/rethinkdb/errors.py old mode 100644 new mode 100755 diff --git a/rethinkdb/gevent_net/__init__.py b/rethinkdb/gevent_net/__init__.py old mode 100644 new mode 100755 diff --git a/rethinkdb/gevent_net/net_gevent.py b/rethinkdb/gevent_net/net_gevent.py old mode 100644 new mode 100755 diff --git a/rethinkdb/handshake.py b/rethinkdb/handshake.py old mode 100644 new mode 100755 diff --git a/rethinkdb/helpers.py b/rethinkdb/helpers.py old mode 100644 new mode 100755 diff --git a/rethinkdb/logger.py b/rethinkdb/logger.py old mode 100644 new mode 100755 diff --git a/rethinkdb/net.py b/rethinkdb/net.py old mode 100644 new mode 100755 diff --git a/rethinkdb/query.py b/rethinkdb/query.py old mode 100644 new mode 100755 diff --git a/rethinkdb/tornado_net/__init__.py b/rethinkdb/tornado_net/__init__.py old mode 100644 new mode 100755 diff --git a/rethinkdb/tornado_net/net_tornado.py b/rethinkdb/tornado_net/net_tornado.py old mode 100644 new mode 100755 diff --git a/rethinkdb/trio_net/__init__.py b/rethinkdb/trio_net/__init__.py old mode 100644 new mode 100755 diff --git a/rethinkdb/trio_net/net_trio.py b/rethinkdb/trio_net/net_trio.py old mode 100644 new mode 100755 diff --git a/rethinkdb/twisted_net/__init__.py b/rethinkdb/twisted_net/__init__.py old mode 100644 new mode 100755 diff --git a/rethinkdb/twisted_net/net_twisted.py b/rethinkdb/twisted_net/net_twisted.py old mode 100644 new mode 100755 diff --git a/rethinkdb/utils_common.py b/rethinkdb/utils_common.py old mode 100644 new mode 100755 diff --git a/rethinkdb/version.py b/rethinkdb/version.py old mode 100644 new mode 100755 diff --git a/scripts/convert_protofile.py b/scripts/convert_protofile.py old mode 100644 new mode 100755 diff --git a/scripts/prepare_remote_test.py b/scripts/prepare_remote_test.py old mode 100644 new mode 100755 diff --git a/scripts/upload-coverage.sh b/scripts/upload-coverage.sh old mode 100644 new mode 100755 diff --git a/scripts/upload-pypi.sh b/scripts/upload-pypi.sh old mode 100644 new mode 100755 diff --git a/setup.cfg b/setup.cfg old mode 100644 new mode 100755 diff --git a/setup.py b/setup.py old mode 100644 new mode 100755 diff --git a/tests/__init__.py b/tests/__init__.py old mode 100644 new mode 100755 diff --git a/tests/conftest.py b/tests/conftest.py old mode 100644 new mode 100755 diff --git a/tests/helpers.py b/tests/helpers.py old mode 100644 new mode 100755 diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py old mode 100644 new mode 100755 diff --git a/tests/integration/test_asyncio.py b/tests/integration/test_asyncio.py old mode 100644 new mode 100755 diff --git a/tests/integration/test_connect.py b/tests/integration/test_connect.py old mode 100644 new mode 100755 diff --git a/tests/integration/test_cursor.py b/tests/integration/test_cursor.py old mode 100644 new mode 100755 diff --git a/tests/integration/test_data_write.py b/tests/integration/test_data_write.py old mode 100644 new mode 100755 diff --git a/tests/integration/test_database.py b/tests/integration/test_database.py old mode 100644 new mode 100755 diff --git a/tests/integration/test_date_and_time.py b/tests/integration/test_date_and_time.py old mode 100644 new mode 100755 diff --git a/tests/integration/test_index.py b/tests/integration/test_index.py old mode 100644 new mode 100755 diff --git a/tests/integration/test_ping.py b/tests/integration/test_ping.py old mode 100644 new mode 100755 diff --git a/tests/integration/test_repl.py b/tests/integration/test_repl.py old mode 100644 new mode 100755 diff --git a/tests/integration/test_table.py b/tests/integration/test_table.py old mode 100644 new mode 100755 diff --git a/tests/integration/test_tornado.py b/tests/integration/test_tornado.py old mode 100644 new mode 100755 diff --git a/tests/integration/test_trio.py b/tests/integration/test_trio.py old mode 100644 new mode 100755 diff --git a/tests/integration/test_write_hooks.py b/tests/integration/test_write_hooks.py old mode 100644 new mode 100755 diff --git a/tests/test_date_and_time.py b/tests/test_date_and_time.py old mode 100644 new mode 100755 diff --git a/tests/test_handshake.py b/tests/test_handshake.py old mode 100644 new mode 100755 diff --git a/tests/test_helpers.py b/tests/test_helpers.py old mode 100644 new mode 100755 diff --git a/tests/test_logger.py b/tests/test_logger.py old mode 100644 new mode 100755 diff --git a/tests/test_net.py b/tests/test_net.py old mode 100644 new mode 100755 diff --git a/tests/test_utils_common.py b/tests/test_utils_common.py old mode 100644 new mode 100755