Skip to content

Commit

Permalink
T197241: Directly reconnect database connection (#5)
Browse files Browse the repository at this point in the history
Directly reconnect database connection if this is the first query in this transaction and the connection is broken.
  • Loading branch information
viktordick authored Jun 12, 2020
1 parent 6011872 commit 86e4607
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 92 deletions.
7 changes: 7 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
Changelog
=========

2.4.7.dev1+perfact.2
--------------------

Directly reconnect broken connections if this is the first query in the
transaction instead of directly raising a RetryError.


2.4.7.dev1+perfact.1
-------------------
Collection of PerFact patches to the Product, including reorganization of how
Expand Down
2 changes: 1 addition & 1 deletion Products/ZPsycopgDA/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# their work without bothering about the module dependencies.

__doc__ = "ZPsycopg Database Adapter Registration."
__version__ = '2.4.7.dev1+perfact.1'
__version__ = '2.4.7.dev1+perfact.2'

# Python2 backward compatibility
try:
Expand Down
230 changes: 139 additions & 91 deletions Products/ZPsycopgDA/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ def __init__(self, dsn, tilevel, typecasts, enc='utf-8',
# Connectors with tainted transactions
self.tainted = []

# Connectors that have uncommited changes
self.in_transaction = set()

self.make_mappings()

self.pool = CustomConnectionPool(
Expand Down Expand Up @@ -179,8 +182,8 @@ def xid(self):
return xid

def _begin(self):
conn = self.getconn(False)
if self.use_tpc:
conn = self.getconn(False)
xid = self.xid()
conn.tpc_begin(xid)

Expand All @@ -205,6 +208,7 @@ def _finish(self, *ignored):
conn.tpc_commit()
else:
self._commit(put_connection=True)
self.in_transaction.discard(conn)

def _abort(self, *ignored):
# In cases where the _abort() occurs because the connection to the
Expand Down Expand Up @@ -297,52 +301,78 @@ def columns(self, table_name):
self.putconn()
return self.convert_description(c.description, True)

def handle_retry(self, error):
'''Find out if an error deserves a retry.'''
name = error.__class__.__name__
value = repr(error)
serialization_error = (
name == 'TransactionRollbackError' and
'could not serialize' in value
)
if serialization_error:
raise RetryError

# Errors that only affect our connection and where an immediate retry
# should work.
# AdminShutdown sounds bad, but it might only be our connection that is
# affected. When reconnecting after a regular Retry we see if it
# acutually something serious, in which case we will get something like
# 'the database is shutting down'. If it is only our connection, a
# simple reconnect will work.
connection_error = (
name in ('AdminShutdown', 'OperationalError') and (
'server closed the connection' in value or
'terminating connection due to administrator command' in value
)
@staticmethod
def split_error(error):
'''
Split error in name and value for inspection
'''
return (error.__class__.__name__, repr(error))

@staticmethod
def is_connection_error(error):
'''
Errors that only affect our connection and where an immediate retry
should work.
AdminShutdown sounds bad, but it might only be our connection that is
affected. When reconnecting after a regular Retry we see if it
acutually something serious, in which case we will get something like
'the database is shutting down'. If it is only our connection, a simple
reconnect will work.
'''
(name, value) = DB.split_error(error)
return name in (
'AdminShutdown',
'OperationalError',
'InterfaceError'
) and (
'server closed the connection' in value or
'terminating connection due to administrator command' in value or
'connection already closed' in value
)

# Errors that indicate that the database encounters problems. Retry
# only after a few seconds.
server_error = (
@staticmethod
def is_server_error(error):
'''
Errors that indicate that the database encounters problems. Retry only
after a few seconds.
'''
(name, value) = DB.split_error(error)
return (
name == 'OperationalError' and (
'could not connect to server' in value or
'the database system is shutting down' in value or
'the database system is starting up' in value or
'SSL connection has been closed unexpectedly' in value
)
) or (
name == 'InterfaceError' and (
'connection already closed' in value
)
) or (
name == 'NotSupportedError' and (
'cannot set transaction read-write mode' in value
)
)

@staticmethod
def is_serialization_error(error):
'''
Original retry eror in case of serialization failures.
'''
(name, value) = DB.split_error(error)
return (
name == 'TransactionRollbackError' and
'could not serialize' in value
)


def handle_retry(self, error):
'''Find out if an error deserves a retry.'''
if self.is_serialization_error(error):
raise RetryError

connection_error = self.is_connection_error(error)
server_error = self.is_server_error(error)

if connection_error or server_error:
LOG.error(
name, value = self.split_error(error)
LOG.exception(
"Error on connection. Closing. ({}, {})".format(name, value)
)
self.getconn().close()
Expand All @@ -353,71 +383,89 @@ def handle_retry(self, error):
raise RetryDelayError

# query execution

def query(self, query_string, max_rows=None, query_data=None):
try:
self._register()
self.calls = self.calls+1
self._register()
self.calls = self.calls+1

conn = self.getconn()
if conn in self.tainted:
raise ValueError("Query attempted on tainted transaction.")
for retry in range(2):
try:
return self.query_inner(query_string, max_rows, query_data)
except Exception as err:
conn = self.getconn()
# First query in transaction yields a connection error - try to
# simply reconnect
if ((conn not in self.in_transaction)
and self.is_connection_error(err)):
LOG.warning(
"Connection error on first query in transaction, "
"reconnecting."
)
self.putconn(close=True)
continue
break

# We only reach this if another error occured
self.handle_retry(err)
self._abort()

# Taint this transaction
LOG.warning('query() tainting: {} in {}'.format(
conn, self.tainted))
if conn not in self.tainted:
self.tainted.append(conn)
raise err


def query_inner(self, query_string, max_rows=None, query_data=None):
conn = self.getconn()
if conn in self.tainted:
raise ValueError("Query attempted on tainted transaction.")

desc = ()
res = []
nselects = 0

desc = ()
res = []
nselects = 0
c = self.getcursor()

c = self.getcursor()
for qs in [x for x in query_string.split('\0') if x]:
# LOG.info("Trying to execute statement %s" % qs)
if query_data:
c.execute(qs, query_data)
else:
c.execute(qs)

if self.autocommit:
# LOG.info('Autocommitting.')
self._commit()

if c.description is not None:
nselects += 1
if c.description != desc and nselects > 1:
raise psycopg2.ProgrammingError(
'multiple selects in single query not allowed')
if max_rows:
res = c.fetchmany(max_rows)
# JJ 2017-07-20: Danger ahead. We might
# have many more rows in the database,
# which are truncated by max_rows. In that
# case, we should be able to react, by
# raising or logging.
if len(res) == max_rows:
try:
overshoot_result = c.fetchone()
except:
overshoot_result = None
if overshoot_result:
assert False, (
"This query has returned more than "
"max_rows results. Please raise "
"max_rows or limit in SQL.")

for qs in [x for x in query_string.split('\0') if x]:
# LOG.info("Trying to execute statement %s" % qs)
if query_data:
c.execute(qs, query_data)
else:
c.execute(qs)
if self.autocommit:
# LOG.info('Autocommitting.')
self._commit()

if c.description is not None:
nselects += 1
if c.description != desc and nselects > 1:
raise psycopg2.ProgrammingError(
'multiple selects in single query not allowed')
if max_rows:
res = c.fetchmany(max_rows)
# JJ 2017-07-20: Danger ahead. We might
# have many more rows in the database,
# which are truncated by max_rows. In that
# case, we should be able to react, by
# raising or logging.
if len(res) == max_rows:
try:
overshoot_result = c.fetchone()
except:
overshoot_result = None
if overshoot_result:
assert False, (
"This query has returned more than "
"max_rows results. Please raise "
"max_rows or limit in SQL.")

else:
res = c.fetchall()
desc = c.description

self.failures = 0

except Exception as err:
self.handle_retry(err)
self._abort()
res = c.fetchall()
desc = c.description

# Taint this transaction
conn = self.getconn()
LOG.warning('query() tainting: {} in {}'.format(
conn, self.tainted))
if conn not in self.tainted:
self.tainted.append(conn)
raise err
self.failures = 0

self.in_transaction.add(conn)
return self.convert_description(desc), res

0 comments on commit 86e4607

Please sign in to comment.