From e85e59c0f126624129182dfa6a377311e0c0ef2f Mon Sep 17 00:00:00 2001 From: titanism <101466223+titanism@users.noreply.github.com> Date: Sat, 19 Oct 2024 21:28:52 -0500 Subject: [PATCH] fix: wrap all mongoose -> sqlite helpers with retry (in case SQLITE_BUSY code due to another thread having open fd read/write in use with sqlite db) --- helpers/mongoose-to-sqlite.js | 278 +++++++++++++++++++++------------- 1 file changed, 176 insertions(+), 102 deletions(-) diff --git a/helpers/mongoose-to-sqlite.js b/helpers/mongoose-to-sqlite.js index 094d9c025e..29311acf77 100644 --- a/helpers/mongoose-to-sqlite.js +++ b/helpers/mongoose-to-sqlite.js @@ -9,10 +9,14 @@ const Database = require('better-sqlite3-multiple-ciphers'); const _ = require('lodash'); const isSANB = require('is-string-and-not-blank'); const mongoose = require('mongoose'); +const pRetry = require('p-retry'); const safeStringify = require('fast-safe-stringify'); const { Builder } = require('json-sql'); +const config = require('#config'); +const logger = require('#helpers/logger'); const env = require('#config/env'); +const isRetryableError = require('#helpers/is-retryable-error'); const recursivelyParse = require('#helpers/recursively-parse'); const builder = new Builder(); @@ -164,14 +168,17 @@ async function updateMany( }); if (instance.wsp) { - beforeDocs = await instance.wsp.request({ - action: 'stmt', - session: { user: session.user }, - stmt: [ - ['prepare', sql.query], - ['all', sql.values] - ] - }); + beforeDocs = await instance.wsp.request( + { + action: 'stmt', + session: { user: session.user }, + stmt: [ + ['prepare', sql.query], + ['all', sql.values] + ] + }, + 0 + ); } else { beforeDocs = session.db.prepare(sql.query).all(sql.values); } @@ -203,15 +210,18 @@ async function updateMany( }); if (session.db.readonly) { - docs = await instance.wsp.request({ - action: 'stmt', - session: { user: session.user }, - stmt: [ - ['prepare', sql.query], - ['all', sql.values] - ], - checkpoint: 'PASSIVE' - }); + docs = await instance.wsp.request( + { + action: 'stmt', + session: { user: session.user }, + stmt: [ + ['prepare', sql.query], + ['all', sql.values] + ], + checkpoint: 'PASSIVE' + }, + 0 + ); } else { docs = session.db.prepare(sql.query).all(sql.values); session.db.pragma('wal_checkpoint(PASSIVE)'); @@ -259,14 +269,17 @@ async function countDocuments(instance, session, filter = {}) { let result; if (instance.wsp) { - result = await instance.wsp.request({ - action: 'stmt', - session: { user: session.user }, - stmt: [ - ['prepare', sql.query], - ['get', sql.values] - ] - }); + result = await instance.wsp.request( + { + action: 'stmt', + session: { user: session.user }, + stmt: [ + ['prepare', sql.query], + ['get', sql.values] + ] + }, + 0 + ); } else { result = session.db.prepare(sql.query).get(sql.values); } @@ -305,15 +318,18 @@ async function deleteMany(instance, session, condition = {}, options = {}) { try { // use websockets if readonly if (session.db.readonly) { - result = await instance.wsp.request({ - action: 'stmt', - session: { user: session.user }, - stmt: [ - ['prepare', sql.query], - ['run', sql.values] - ], - checkpoint: 'PASSIVE' - }); + result = await instance.wsp.request( + { + action: 'stmt', + session: { user: session.user }, + stmt: [ + ['prepare', sql.query], + ['run', sql.values] + ], + checkpoint: 'PASSIVE' + }, + 0 + ); } else { result = session.db.prepare(sql.query).run(sql.values); session.db.pragma('wal_checkpoint(PASSIVE)'); @@ -369,15 +385,18 @@ async function deleteOne(instance, session, conditions = {}, options = {}) { try { // use websockets if readonly if (session.db.readonly) { - result = await instance.wsp.request({ - action: 'stmt', - session: { user: session.user }, - stmt: [ - ['prepare', sql.query], - ['run', sql.values] - ], - checkpoint: 'PASSIVE' - }); + result = await instance.wsp.request( + { + action: 'stmt', + session: { user: session.user }, + stmt: [ + ['prepare', sql.query], + ['run', sql.values] + ], + checkpoint: 'PASSIVE' + }, + 0 + ); } else { result = session.db.prepare(sql.query).run(sql.values); session.db.pragma('wal_checkpoint(PASSIVE)'); @@ -451,14 +470,17 @@ async function find( let docs; if (instance.wsp) { - docs = await instance.wsp.request({ - action: 'stmt', - session: { user: session.user }, - stmt: [ - ['prepare', sql.query], - ['all', sql.values] - ] - }); + docs = await instance.wsp.request( + { + action: 'stmt', + session: { user: session.user }, + stmt: [ + ['prepare', sql.query], + ['all', sql.values] + ] + }, + 0 + ); } else { docs = session.db.prepare(sql.query).all(sql.values); } @@ -527,14 +549,17 @@ async function findOne( let doc; if (instance.wsp) { - doc = await instance.wsp.request({ - action: 'stmt', - session: { user: session.user }, - stmt: [ - ['prepare', sql.query], - ['get', sql.values] - ] - }); + doc = await instance.wsp.request( + { + action: 'stmt', + session: { user: session.user }, + stmt: [ + ['prepare', sql.query], + ['get', sql.values] + ] + }, + 0 + ); } else { doc = session.db.prepare(sql.query).get(sql.values); } @@ -586,15 +611,18 @@ async function $__handleSave(options = {}, fn) { // use websockets if readonly if (this.session.db.readonly) { - doc = await this.instance.wsp.request({ - action: 'stmt', - session: { user: this.session.user }, - stmt: [ - ['prepare', sql.query], - ['get', sql.values] - ], - checkpoint: 'PASSIVE' - }); + doc = await this.instance.wsp.request( + { + action: 'stmt', + session: { user: this.session.user }, + stmt: [ + ['prepare', sql.query], + ['get', sql.values] + ], + checkpoint: 'PASSIVE' + }, + 0 + ); } else { doc = this.session.db.prepare(sql.query).get(sql.values); this.session.db.pragma('wal_checkpoint(PASSIVE)'); @@ -611,15 +639,18 @@ async function $__handleSave(options = {}, fn) { }); // use websockets if readonly if (this.session.db.readonly) { - doc = await this.instance.wsp.request({ - action: 'stmt', - session: { user: this.session.user }, - stmt: [ - ['prepare', sql.query], - ['get', sql.values] - ], - checkpoint: 'PASSIVE' - }); + doc = await this.instance.wsp.request( + { + action: 'stmt', + session: { user: this.session.user }, + stmt: [ + ['prepare', sql.query], + ['get', sql.values] + ], + checkpoint: 'PASSIVE' + }, + 0 + ); } else { doc = this.session.db.prepare(sql.query).get(sql.values); this.session.db.pragma('wal_checkpoint(PASSIVE)'); @@ -787,15 +818,18 @@ async function findOneAndUpdate( }); if (session.db.readonly) { - doc = await instance.wsp.request({ - action: 'stmt', - session: { user: session.user }, - stmt: [ - ['prepare', sql.query], - ['get', sql.values] - ], - checkpoint: 'PASSIVE' - }); + doc = await instance.wsp.request( + { + action: 'stmt', + session: { user: session.user }, + stmt: [ + ['prepare', sql.query], + ['get', sql.values] + ], + checkpoint: 'PASSIVE' + }, + 0 + ); } else { doc = session.db.prepare(sql.query).get(sql.values); session.db.pragma('wal_checkpoint(PASSIVE)'); @@ -838,11 +872,14 @@ async function distinct(instance, session, field, conditions = {}) { let docs; if (instance.wsp) { - docs = await instance.wsp.request({ - action: 'stmt', - session: { user: session.user }, - stmt: [['prepare', sql.query], ['pluck'], ['all', sql.values]] - }); + docs = await instance.wsp.request( + { + action: 'stmt', + session: { user: session.user }, + stmt: [['prepare', sql.query], ['pluck'], ['all', sql.values]] + }, + 0 + ); } else { docs = session.db.prepare(sql.query).pluck().all(sql.values); } @@ -852,6 +889,26 @@ async function distinct(instance, session, field, conditions = {}) { return docs; } +function wrapWithRetry(fn, model) { + return function (...args) { + return pRetry(() => fn.call(model, ...args), { + retries: 3, + minTimeout: config.busyTimeout / 2, + maxTimeout: config.busyTimeout, + factor: 1, + onFailedAttempt(error) { + error.isCodeBug = true; + error.args = args; + logger.error(error); + + if (isRetryableError(error)) return; + + throw error; + } + }); + }; +} + function dummyProofModel(model) { // add createStatement and mapping const { createStatement, mapping } = parseSchema(model); @@ -866,19 +923,36 @@ function dummyProofModel(model) { model.prototype[method] = noop(method); } - model.countDocuments = countDocuments.bind(model); - model.find = find.bind(model); - model.findById = findById.bind(model); - model.findOne = findOne.bind(model); - model.deleteOne = deleteOne.bind(model); - model.deleteMany = deleteMany.bind(model); - model.findByIdAndUpdate = findByIdAndUpdate.bind(model); - model.findOneAndUpdate = findOneAndUpdate.bind(model); - model.distinct = distinct.bind(model); - model.updateMany = updateMany.bind(model); + model.countDocuments = wrapWithRetry(countDocuments, model); + model.find = wrapWithRetry(find, model); + model.findById = wrapWithRetry(findById, model); + model.findOne = wrapWithRetry(findOne, model); + model.deleteOne = wrapWithRetry(deleteOne, model); + model.deleteMany = wrapWithRetry(deleteMany, model); + model.findByIdAndUpdate = wrapWithRetry(findByIdAndUpdate, model); + model.findOneAndUpdate = wrapWithRetry(findOneAndUpdate, model); + model.distinct = wrapWithRetry(distinct, model); + model.updateMany = wrapWithRetry(updateMany, model); // - model.prototype.$__handleSave = $__handleSave; + model.prototype.$__handleSave = function (...args) { + const that = this; + return pRetry(() => $__handleSave.call(that, ...args), { + retries: 3, + minTimeout: config.busyTimeout / 2, + maxTimeout: config.busyTimeout, + factor: 1, + onFailedAttempt(error) { + error.isCodeBug = true; + error.args = args; + logger.error(error); + + if (isRetryableError(error)) return; + + throw error; + } + }); + }; return model; }