Skip to content

Commit

Permalink
Merge pull request #15229 from Automattic/vkarpov15/gh-15201
Browse files Browse the repository at this point in the history
feat(connection): make connection helpers respect bufferTimeoutMS
  • Loading branch information
vkarpov15 authored Feb 4, 2025
2 parents 0722e8f + 5fad5a3 commit 5fd5b4d
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 16 deletions.
8 changes: 1 addition & 7 deletions lib/collection.js
Original file line number Diff line number Diff line change
Expand Up @@ -311,13 +311,7 @@ Collection.prototype._getBufferTimeoutMS = function _getBufferTimeoutMS() {
if (opts && opts.schemaUserProvidedOptions != null && opts.schemaUserProvidedOptions.bufferTimeoutMS != null) {
return opts.schemaUserProvidedOptions.bufferTimeoutMS;
}
if (conn.config.bufferTimeoutMS != null) {
return conn.config.bufferTimeoutMS;
}
if (conn.base != null && conn.base.get('bufferTimeoutMS') != null) {
return conn.base.get('bufferTimeoutMS');
}
return 10000;
return conn._getBufferTimeoutMS();
};

/*!
Expand Down
54 changes: 51 additions & 3 deletions lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -824,10 +824,54 @@ Connection.prototype.dropCollection = async function dropCollection(collection)

Connection.prototype._waitForConnect = async function _waitForConnect() {
if ((this.readyState === STATES.connecting || this.readyState === STATES.disconnected) && this._shouldBufferCommands()) {
await new Promise(resolve => {
this._queue.push({ fn: resolve });
});
const bufferTimeoutMS = this._getBufferTimeoutMS();
let timeout = null;
let timedOut = false;
// The element that this function pushes onto `_queue`, stored to make it easy to remove later
const queueElement = {};
await Promise.race([
new Promise(resolve => {
queueElement.fn = resolve;
this._queue.push(queueElement);
}),
new Promise(resolve => {
timeout = setTimeout(
() => {
timedOut = true;
resolve();
},
bufferTimeoutMS
);
})
]);

if (timedOut) {
const index = this._queue.indexOf(queueElement);
if (index !== -1) {
this._queue.splice(index, 1);
}
const message = 'Connection operation buffering timed out after ' + bufferTimeoutMS + 'ms';
throw new MongooseError(message);
} else if (timeout != null) {
// Not strictly necessary, but avoid the extra overhead of creating a new MongooseError
// in case of success
clearTimeout(timeout);
}
}
};

/*!
* Get the default buffer timeout for this connection
*/

Connection.prototype._getBufferTimeoutMS = function _getBufferTimeoutMS() {
if (this.config.bufferTimeoutMS != null) {
return this.config.bufferTimeoutMS;
}
if (this.base != null && this.base.get('bufferTimeoutMS') != null) {
return this.base.get('bufferTimeoutMS');
}
return 10000;
};

/**
Expand Down Expand Up @@ -1156,6 +1200,10 @@ Connection.prototype.close = async function close(force) {
this.$wasForceClosed = !!force;
}

if (this._lastHeartbeatAt != null) {
this._lastHeartbeatAt = null;
}

for (const model of Object.values(this.models)) {
// If manually disconnecting, make sure to clear each model's `$init`
// promise, so Mongoose knows to re-run `init()` in case the
Expand Down
8 changes: 8 additions & 0 deletions lib/drivers/node-mongodb-native/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,11 @@ NativeConnection.prototype.createClient = async function createClient(uri, optio
delete options.autoSearchIndex;
}

if ('bufferTimeoutMS' in options) {
this.config.bufferTimeoutMS = options.bufferTimeoutMS;
delete options.bufferTimeoutMS;
}

// Backwards compat
if (options.user || options.pass) {
options.auth = options.auth || {};
Expand Down Expand Up @@ -426,6 +431,9 @@ function _setClient(conn, client, options, dbName) {
}
});
}

conn._lastHeartbeatAt = null;

client.on('serverHeartbeatSucceeded', () => {
conn._lastHeartbeatAt = Date.now();
});
Expand Down
7 changes: 1 addition & 6 deletions lib/model.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ const prepareDiscriminatorPipeline = require('./helpers/aggregate/prepareDiscrim
const pushNestedArrayPaths = require('./helpers/model/pushNestedArrayPaths');
const removeDeselectedForeignField = require('./helpers/populate/removeDeselectedForeignField');
const setDottedPath = require('./helpers/path/setDottedPath');
const STATES = require('./connectionState');
const util = require('util');
const utils = require('./utils');
const minimize = require('./helpers/minimize');
Expand Down Expand Up @@ -1104,11 +1103,7 @@ Model.init = function init() {
return results;
};
const _createCollection = async() => {
if ((conn.readyState === STATES.connecting || conn.readyState === STATES.disconnected) && conn._shouldBufferCommands()) {
await new Promise(resolve => {
conn._queue.push({ fn: resolve });
});
}
await conn._waitForConnect();
const autoCreate = utils.getOption(
'autoCreate',
this.schema.options,
Expand Down
10 changes: 10 additions & 0 deletions test/collection.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,16 @@ describe('collections:', function() {
});
});

it('handles bufferTimeoutMS in schemaUserProvidedOptions', async function() {
db = mongoose.createConnection();
const collection = db.collection('gh14184');
collection.opts.schemaUserProvidedOptions = { bufferTimeoutMS: 100 };

const err = await collection.find({ foo: 'bar' }, {}).then(() => null, err => err);
assert.ok(err);
assert.ok(err.message.includes('buffering timed out after 100ms'));
});

it('methods should that throw (unimplemented)', function() {
const collection = new Collection('test', mongoose.connection);
let thrown = false;
Expand Down
21 changes: 21 additions & 0 deletions test/connection.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1787,6 +1787,27 @@ describe('connections:', function() {
assert.ok(res.mongoose.results[1].message.includes('not a number'));
});

it('buffers connection helpers', async function() {
const m = new mongoose.Mongoose();

const promise = m.connection.listCollections();

await new Promise(resolve => setTimeout(resolve, 100));
await m.connect(start.uri, { bufferTimeoutMS: 1000 });
await promise;

await m.connection.listCollections();

await m.disconnect();
});

it('connection helpers buffering times out', async function() {
const m = new mongoose.Mongoose();
m.set('bufferTimeoutMS', 100);

await assert.rejects(m.connection.listCollections(), /Connection operation buffering timed out after 100ms/);
});

it('supports db-level aggregate on connection (gh-15118)', async function() {
const db = start();

Expand Down

0 comments on commit 5fd5b4d

Please sign in to comment.