diff --git a/CHANGES.txt b/CHANGES.txt index b9ab0c7..fd4d747 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,6 +1,13 @@ Changelog ========= +2.4.7.dev1+perfact.2 +-------------------- + +Directly reconnect broken connections if this is the first query in the +transaction instead of directly raising a RetryError. + + 2.4.7.dev1+perfact.1 ------------------- Collection of PerFact patches to the Product, including reorganization of how diff --git a/Products/ZPsycopgDA/__init__.py b/Products/ZPsycopgDA/__init__.py index e5ded6c..166ad40 100644 --- a/Products/ZPsycopgDA/__init__.py +++ b/Products/ZPsycopgDA/__init__.py @@ -16,7 +16,7 @@ # their work without bothering about the module dependencies. __doc__ = "ZPsycopg Database Adapter Registration." -__version__ = '2.4.7.dev1+perfact.1' +__version__ = '2.4.7.dev1+perfact.2' # Python2 backward compatibility try: diff --git a/Products/ZPsycopgDA/db.py b/Products/ZPsycopgDA/db.py index f297b1f..2fa5fc4 100644 --- a/Products/ZPsycopgDA/db.py +++ b/Products/ZPsycopgDA/db.py @@ -132,6 +132,9 @@ def __init__(self, dsn, tilevel, typecasts, enc='utf-8', # Connectors with tainted transactions self.tainted = [] + # Connectors that have uncommited changes + self.in_transaction = set() + self.make_mappings() self.pool = CustomConnectionPool( @@ -179,8 +182,8 @@ def xid(self): return xid def _begin(self): + conn = self.getconn(False) if self.use_tpc: - conn = self.getconn(False) xid = self.xid() conn.tpc_begin(xid) @@ -205,6 +208,7 @@ def _finish(self, *ignored): conn.tpc_commit() else: self._commit(put_connection=True) + self.in_transaction.discard(conn) def _abort(self, *ignored): # In cases where the _abort() occurs because the connection to the @@ -297,52 +301,78 @@ def columns(self, table_name): self.putconn() return self.convert_description(c.description, True) - def handle_retry(self, error): - '''Find out if an error deserves a retry.''' - name = error.__class__.__name__ - value = repr(error) - serialization_error = ( - name == 'TransactionRollbackError' and - 'could not serialize' in value - ) - if serialization_error: - raise RetryError - - # Errors that only affect our connection and where an immediate retry - # should work. - # AdminShutdown sounds bad, but it might only be our connection that is - # affected. When reconnecting after a regular Retry we see if it - # acutually something serious, in which case we will get something like - # 'the database is shutting down'. If it is only our connection, a - # simple reconnect will work. - connection_error = ( - name in ('AdminShutdown', 'OperationalError') and ( - 'server closed the connection' in value or - 'terminating connection due to administrator command' in value - ) + @staticmethod + def split_error(error): + ''' + Split error in name and value for inspection + ''' + return (error.__class__.__name__, repr(error)) + + @staticmethod + def is_connection_error(error): + ''' + Errors that only affect our connection and where an immediate retry + should work. + AdminShutdown sounds bad, but it might only be our connection that is + affected. When reconnecting after a regular Retry we see if it + acutually something serious, in which case we will get something like + 'the database is shutting down'. If it is only our connection, a simple + reconnect will work. + ''' + (name, value) = DB.split_error(error) + return name in ( + 'AdminShutdown', + 'OperationalError', + 'InterfaceError' + ) and ( + 'server closed the connection' in value or + 'terminating connection due to administrator command' in value or + 'connection already closed' in value ) - # Errors that indicate that the database encounters problems. Retry - # only after a few seconds. - server_error = ( + @staticmethod + def is_server_error(error): + ''' + Errors that indicate that the database encounters problems. Retry only + after a few seconds. + ''' + (name, value) = DB.split_error(error) + return ( name == 'OperationalError' and ( 'could not connect to server' in value or 'the database system is shutting down' in value or 'the database system is starting up' in value or 'SSL connection has been closed unexpectedly' in value ) - ) or ( - name == 'InterfaceError' and ( - 'connection already closed' in value - ) ) or ( name == 'NotSupportedError' and ( 'cannot set transaction read-write mode' in value ) ) + @staticmethod + def is_serialization_error(error): + ''' + Original retry eror in case of serialization failures. + ''' + (name, value) = DB.split_error(error) + return ( + name == 'TransactionRollbackError' and + 'could not serialize' in value + ) + + + def handle_retry(self, error): + '''Find out if an error deserves a retry.''' + if self.is_serialization_error(error): + raise RetryError + + connection_error = self.is_connection_error(error) + server_error = self.is_server_error(error) + if connection_error or server_error: - LOG.error( + name, value = self.split_error(error) + LOG.exception( "Error on connection. Closing. ({}, {})".format(name, value) ) self.getconn().close() @@ -353,71 +383,89 @@ def handle_retry(self, error): raise RetryDelayError # query execution - def query(self, query_string, max_rows=None, query_data=None): - try: - self._register() - self.calls = self.calls+1 + self._register() + self.calls = self.calls+1 - conn = self.getconn() - if conn in self.tainted: - raise ValueError("Query attempted on tainted transaction.") + for retry in range(2): + try: + return self.query_inner(query_string, max_rows, query_data) + except Exception as err: + conn = self.getconn() + # First query in transaction yields a connection error - try to + # simply reconnect + if ((conn not in self.in_transaction) + and self.is_connection_error(err)): + LOG.warning( + "Connection error on first query in transaction, " + "reconnecting." + ) + self.putconn(close=True) + continue + break + + # We only reach this if another error occured + self.handle_retry(err) + self._abort() + + # Taint this transaction + LOG.warning('query() tainting: {} in {}'.format( + conn, self.tainted)) + if conn not in self.tainted: + self.tainted.append(conn) + raise err + + + def query_inner(self, query_string, max_rows=None, query_data=None): + conn = self.getconn() + if conn in self.tainted: + raise ValueError("Query attempted on tainted transaction.") + + desc = () + res = [] + nselects = 0 - desc = () - res = [] - nselects = 0 + c = self.getcursor() - c = self.getcursor() + for qs in [x for x in query_string.split('\0') if x]: + # LOG.info("Trying to execute statement %s" % qs) + if query_data: + c.execute(qs, query_data) + else: + c.execute(qs) + + if self.autocommit: + # LOG.info('Autocommitting.') + self._commit() + + if c.description is not None: + nselects += 1 + if c.description != desc and nselects > 1: + raise psycopg2.ProgrammingError( + 'multiple selects in single query not allowed') + if max_rows: + res = c.fetchmany(max_rows) + # JJ 2017-07-20: Danger ahead. We might + # have many more rows in the database, + # which are truncated by max_rows. In that + # case, we should be able to react, by + # raising or logging. + if len(res) == max_rows: + try: + overshoot_result = c.fetchone() + except: + overshoot_result = None + if overshoot_result: + assert False, ( + "This query has returned more than " + "max_rows results. Please raise " + "max_rows or limit in SQL.") - for qs in [x for x in query_string.split('\0') if x]: - # LOG.info("Trying to execute statement %s" % qs) - if query_data: - c.execute(qs, query_data) else: - c.execute(qs) - if self.autocommit: - # LOG.info('Autocommitting.') - self._commit() - - if c.description is not None: - nselects += 1 - if c.description != desc and nselects > 1: - raise psycopg2.ProgrammingError( - 'multiple selects in single query not allowed') - if max_rows: - res = c.fetchmany(max_rows) - # JJ 2017-07-20: Danger ahead. We might - # have many more rows in the database, - # which are truncated by max_rows. In that - # case, we should be able to react, by - # raising or logging. - if len(res) == max_rows: - try: - overshoot_result = c.fetchone() - except: - overshoot_result = None - if overshoot_result: - assert False, ( - "This query has returned more than " - "max_rows results. Please raise " - "max_rows or limit in SQL.") - - else: - res = c.fetchall() - desc = c.description - - self.failures = 0 - - except Exception as err: - self.handle_retry(err) - self._abort() + res = c.fetchall() + desc = c.description - # Taint this transaction - conn = self.getconn() - LOG.warning('query() tainting: {} in {}'.format( - conn, self.tainted)) - if conn not in self.tainted: - self.tainted.append(conn) - raise err + self.failures = 0 + self.in_transaction.add(conn) return self.convert_description(desc), res