Skip to content

Commit 02ef89b

Browse files
committed
[IMP] queue_job: HA job runner using session level advisory lock
1 parent e267d92 commit 02ef89b

File tree

1 file changed

+24
-2
lines changed

1 file changed

+24
-2
lines changed

queue_job/jobrunner/runner.py

+24-2
Original file line numberDiff line numberDiff line change
@@ -159,12 +159,17 @@
159159

160160
SELECT_TIMEOUT = 60
161161
ERROR_RECOVERY_DELAY = 5
162+
PG_ADVISORY_LOCK_ID = 2293787760715711918
162163

163164
_logger = logging.getLogger(__name__)
164165

165166
select = selectors.DefaultSelector
166167

167168

169+
class MasterElectionLost(Exception):
170+
pass
171+
172+
168173
# Unfortunately, it is not possible to extend the Odoo
169174
# server command line arguments, so we resort to environment variables
170175
# to configure the runner (channels mostly).
@@ -268,6 +273,7 @@ def __init__(self, db_name):
268273
self.conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
269274
self.has_queue_job = self._has_queue_job()
270275
if self.has_queue_job:
276+
self._acquire_master_lock()
271277
self._initialize()
272278
except BaseException:
273279
self.close()
@@ -284,6 +290,14 @@ def close(self):
284290
pass
285291
self.conn = None
286292

293+
def _acquire_master_lock(self):
294+
"""Acquire the master runner lock or raise MasterElectionLost"""
295+
with closing(self.conn.cursor()) as cr:
296+
cr.execute("SELECT pg_try_advisory_lock(%s)", (PG_ADVISORY_LOCK_ID,))
297+
if not cr.fetchone()[0]:
298+
msg = f"Could not acquire master runner lock on {self.db_name}"
299+
raise MasterElectionLost(msg)
300+
287301
def _has_queue_job(self):
288302
with closing(self.conn.cursor()) as cr:
289303
cr.execute(
@@ -413,7 +427,7 @@ def get_db_names(self):
413427
db_names = config["db_name"].split(",")
414428
else:
415429
db_names = odoo.service.db.list_dbs(True)
416-
return db_names
430+
return sorted(db_names)
417431

418432
def close_databases(self, remove_jobs=True):
419433
for db_name, db in self.db_by_name.items():
@@ -522,7 +536,7 @@ def run(self):
522536
while not self._stop:
523537
# outer loop does exception recovery
524538
try:
525-
_logger.info("initializing database connections")
539+
_logger.debug("initializing database connections")
526540
# TODO: how to detect new databases or databases
527541
# on which queue_job is installed after server start?
528542
self.initialize_databases()
@@ -537,6 +551,14 @@ def run(self):
537551
except InterruptedError:
538552
# Interrupted system call, i.e. KeyboardInterrupt during select
539553
self.stop()
554+
except MasterElectionLost as e:
555+
_logger.debug(
556+
"master election lost: %s, sleeping %ds and retrying",
557+
e,
558+
ERROR_RECOVERY_DELAY,
559+
)
560+
self.close_databases()
561+
time.sleep(ERROR_RECOVERY_DELAY)
540562
except Exception:
541563
_logger.exception(
542564
"exception: sleeping %ds and retrying", ERROR_RECOVERY_DELAY

0 commit comments

Comments
 (0)