Skip to content

Commit

Permalink
fix: wrap all mongoose -> sqlite helpers with retry (in case SQLITE_B…
Browse files Browse the repository at this point in the history
…USY code due to another thread having open fd read/write in use with sqlite db)
  • Loading branch information
titanism committed Oct 20, 2024
1 parent 3d612b4 commit e85e59c
Showing 1 changed file with 176 additions and 102 deletions.
278 changes: 176 additions & 102 deletions helpers/mongoose-to-sqlite.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,14 @@ const Database = require('better-sqlite3-multiple-ciphers');
const _ = require('lodash');
const isSANB = require('is-string-and-not-blank');
const mongoose = require('mongoose');
const pRetry = require('p-retry');
const safeStringify = require('fast-safe-stringify');
const { Builder } = require('json-sql');

const config = require('#config');
const logger = require('#helpers/logger');
const env = require('#config/env');
const isRetryableError = require('#helpers/is-retryable-error');
const recursivelyParse = require('#helpers/recursively-parse');

const builder = new Builder();
Expand Down Expand Up @@ -164,14 +168,17 @@ async function updateMany(
});

if (instance.wsp) {
beforeDocs = await instance.wsp.request({
action: 'stmt',
session: { user: session.user },
stmt: [
['prepare', sql.query],
['all', sql.values]
]
});
beforeDocs = await instance.wsp.request(
{
action: 'stmt',
session: { user: session.user },
stmt: [
['prepare', sql.query],
['all', sql.values]
]
},
0
);
} else {
beforeDocs = session.db.prepare(sql.query).all(sql.values);
}
Expand Down Expand Up @@ -203,15 +210,18 @@ async function updateMany(
});

if (session.db.readonly) {
docs = await instance.wsp.request({
action: 'stmt',
session: { user: session.user },
stmt: [
['prepare', sql.query],
['all', sql.values]
],
checkpoint: 'PASSIVE'
});
docs = await instance.wsp.request(
{
action: 'stmt',
session: { user: session.user },
stmt: [
['prepare', sql.query],
['all', sql.values]
],
checkpoint: 'PASSIVE'
},
0
);
} else {
docs = session.db.prepare(sql.query).all(sql.values);
session.db.pragma('wal_checkpoint(PASSIVE)');
Expand Down Expand Up @@ -259,14 +269,17 @@ async function countDocuments(instance, session, filter = {}) {

let result;
if (instance.wsp) {
result = await instance.wsp.request({
action: 'stmt',
session: { user: session.user },
stmt: [
['prepare', sql.query],
['get', sql.values]
]
});
result = await instance.wsp.request(
{
action: 'stmt',
session: { user: session.user },
stmt: [
['prepare', sql.query],
['get', sql.values]
]
},
0
);
} else {
result = session.db.prepare(sql.query).get(sql.values);
}
Expand Down Expand Up @@ -305,15 +318,18 @@ async function deleteMany(instance, session, condition = {}, options = {}) {
try {
// use websockets if readonly
if (session.db.readonly) {
result = await instance.wsp.request({
action: 'stmt',
session: { user: session.user },
stmt: [
['prepare', sql.query],
['run', sql.values]
],
checkpoint: 'PASSIVE'
});
result = await instance.wsp.request(
{
action: 'stmt',
session: { user: session.user },
stmt: [
['prepare', sql.query],
['run', sql.values]
],
checkpoint: 'PASSIVE'
},
0
);
} else {
result = session.db.prepare(sql.query).run(sql.values);
session.db.pragma('wal_checkpoint(PASSIVE)');
Expand Down Expand Up @@ -369,15 +385,18 @@ async function deleteOne(instance, session, conditions = {}, options = {}) {
try {
// use websockets if readonly
if (session.db.readonly) {
result = await instance.wsp.request({
action: 'stmt',
session: { user: session.user },
stmt: [
['prepare', sql.query],
['run', sql.values]
],
checkpoint: 'PASSIVE'
});
result = await instance.wsp.request(
{
action: 'stmt',
session: { user: session.user },
stmt: [
['prepare', sql.query],
['run', sql.values]
],
checkpoint: 'PASSIVE'
},
0
);
} else {
result = session.db.prepare(sql.query).run(sql.values);
session.db.pragma('wal_checkpoint(PASSIVE)');
Expand Down Expand Up @@ -451,14 +470,17 @@ async function find(

let docs;
if (instance.wsp) {
docs = await instance.wsp.request({
action: 'stmt',
session: { user: session.user },
stmt: [
['prepare', sql.query],
['all', sql.values]
]
});
docs = await instance.wsp.request(
{
action: 'stmt',
session: { user: session.user },
stmt: [
['prepare', sql.query],
['all', sql.values]
]
},
0
);
} else {
docs = session.db.prepare(sql.query).all(sql.values);
}
Expand Down Expand Up @@ -527,14 +549,17 @@ async function findOne(
let doc;

if (instance.wsp) {
doc = await instance.wsp.request({
action: 'stmt',
session: { user: session.user },
stmt: [
['prepare', sql.query],
['get', sql.values]
]
});
doc = await instance.wsp.request(
{
action: 'stmt',
session: { user: session.user },
stmt: [
['prepare', sql.query],
['get', sql.values]
]
},
0
);
} else {
doc = session.db.prepare(sql.query).get(sql.values);
}
Expand Down Expand Up @@ -586,15 +611,18 @@ async function $__handleSave(options = {}, fn) {

// use websockets if readonly
if (this.session.db.readonly) {
doc = await this.instance.wsp.request({
action: 'stmt',
session: { user: this.session.user },
stmt: [
['prepare', sql.query],
['get', sql.values]
],
checkpoint: 'PASSIVE'
});
doc = await this.instance.wsp.request(
{
action: 'stmt',
session: { user: this.session.user },
stmt: [
['prepare', sql.query],
['get', sql.values]
],
checkpoint: 'PASSIVE'
},
0
);
} else {
doc = this.session.db.prepare(sql.query).get(sql.values);
this.session.db.pragma('wal_checkpoint(PASSIVE)');
Expand All @@ -611,15 +639,18 @@ async function $__handleSave(options = {}, fn) {
});
// use websockets if readonly
if (this.session.db.readonly) {
doc = await this.instance.wsp.request({
action: 'stmt',
session: { user: this.session.user },
stmt: [
['prepare', sql.query],
['get', sql.values]
],
checkpoint: 'PASSIVE'
});
doc = await this.instance.wsp.request(
{
action: 'stmt',
session: { user: this.session.user },
stmt: [
['prepare', sql.query],
['get', sql.values]
],
checkpoint: 'PASSIVE'
},
0
);
} else {
doc = this.session.db.prepare(sql.query).get(sql.values);
this.session.db.pragma('wal_checkpoint(PASSIVE)');
Expand Down Expand Up @@ -787,15 +818,18 @@ async function findOneAndUpdate(
});

if (session.db.readonly) {
doc = await instance.wsp.request({
action: 'stmt',
session: { user: session.user },
stmt: [
['prepare', sql.query],
['get', sql.values]
],
checkpoint: 'PASSIVE'
});
doc = await instance.wsp.request(
{
action: 'stmt',
session: { user: session.user },
stmt: [
['prepare', sql.query],
['get', sql.values]
],
checkpoint: 'PASSIVE'
},
0
);
} else {
doc = session.db.prepare(sql.query).get(sql.values);
session.db.pragma('wal_checkpoint(PASSIVE)');
Expand Down Expand Up @@ -838,11 +872,14 @@ async function distinct(instance, session, field, conditions = {}) {
let docs;

if (instance.wsp) {
docs = await instance.wsp.request({
action: 'stmt',
session: { user: session.user },
stmt: [['prepare', sql.query], ['pluck'], ['all', sql.values]]
});
docs = await instance.wsp.request(
{
action: 'stmt',
session: { user: session.user },
stmt: [['prepare', sql.query], ['pluck'], ['all', sql.values]]
},
0
);
} else {
docs = session.db.prepare(sql.query).pluck().all(sql.values);
}
Expand All @@ -852,6 +889,26 @@ async function distinct(instance, session, field, conditions = {}) {
return docs;
}

function wrapWithRetry(fn, model) {
return function (...args) {
return pRetry(() => fn.call(model, ...args), {
retries: 3,
minTimeout: config.busyTimeout / 2,
maxTimeout: config.busyTimeout,
factor: 1,
onFailedAttempt(error) {
error.isCodeBug = true;
error.args = args;
logger.error(error);

if (isRetryableError(error)) return;

throw error;
}
});
};
}

function dummyProofModel(model) {
// add createStatement and mapping
const { createStatement, mapping } = parseSchema(model);
Expand All @@ -866,19 +923,36 @@ function dummyProofModel(model) {
model.prototype[method] = noop(method);
}

model.countDocuments = countDocuments.bind(model);
model.find = find.bind(model);
model.findById = findById.bind(model);
model.findOne = findOne.bind(model);
model.deleteOne = deleteOne.bind(model);
model.deleteMany = deleteMany.bind(model);
model.findByIdAndUpdate = findByIdAndUpdate.bind(model);
model.findOneAndUpdate = findOneAndUpdate.bind(model);
model.distinct = distinct.bind(model);
model.updateMany = updateMany.bind(model);
model.countDocuments = wrapWithRetry(countDocuments, model);
model.find = wrapWithRetry(find, model);
model.findById = wrapWithRetry(findById, model);
model.findOne = wrapWithRetry(findOne, model);
model.deleteOne = wrapWithRetry(deleteOne, model);
model.deleteMany = wrapWithRetry(deleteMany, model);
model.findByIdAndUpdate = wrapWithRetry(findByIdAndUpdate, model);
model.findOneAndUpdate = wrapWithRetry(findOneAndUpdate, model);
model.distinct = wrapWithRetry(distinct, model);
model.updateMany = wrapWithRetry(updateMany, model);

// <https://github.com/Automattic/mongoose/blob/7efa1512915c5527bc53d81a2effd3d539324875/lib/model.js#L311-L318>
model.prototype.$__handleSave = $__handleSave;
model.prototype.$__handleSave = function (...args) {
const that = this;
return pRetry(() => $__handleSave.call(that, ...args), {
retries: 3,
minTimeout: config.busyTimeout / 2,
maxTimeout: config.busyTimeout,
factor: 1,
onFailedAttempt(error) {
error.isCodeBug = true;
error.args = args;
logger.error(error);

if (isRetryableError(error)) return;

throw error;
}
});
};

return model;
}
Expand Down

0 comments on commit e85e59c

Please sign in to comment.