Skip to content

Commit a548a2e

Browse files
authored
Merge pull request #321 from share/delay-op-for-query-poll
Delay ops sent by query subscriptions until after polling is complete
2 parents 277be09 + 7d67a08 commit a548a2e

File tree

2 files changed

+56
-10
lines changed

2 files changed

+56
-10
lines changed

lib/query-emitter.js

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,12 @@ QueryEmitter.prototype._emitTiming = function(action, start) {
6363
};
6464

6565
QueryEmitter.prototype._update = function(op) {
66+
// Note that `op` should not be projected or sanitized yet. It's possible for
67+
// a query to filter on a field that's not in the projection. skipPoll checks
68+
// to see if an op could possibly affect a query, so it should get passed the
69+
// full op. The onOp listener function must call backend.sanitizeOp()
6670
var id = op.d;
71+
var pollCallback = this._defaultCallback;
6772

6873
// Check if the op's id matches the query before updating the query results
6974
// and send it through immediately if it does. The current snapshot
@@ -90,28 +95,35 @@ QueryEmitter.prototype._update = function(op) {
9095
// that removed the doc from the query to cause the client-side computed
9196
// list to update.
9297
if (this.ids.indexOf(id) !== -1) {
93-
// Note that this op is not projected or sanitized yet. It's possible for a
94-
// query to filter on a field that's not in the projection. skipPoll checks
95-
// to see if an op could possibly affect a query, so it should get passed
96-
// the full op. The onOp listener function must call backend.sanitizeOp()
97-
this.onOp(op);
98+
var emitter = this;
99+
pollCallback = function(err) {
100+
// Send op regardless of polling error. Clients handle subscription to ops
101+
// on the documents that currently match query results independently from
102+
// updating which docs match the query
103+
emitter.onOp(op);
104+
if (err) emitter.onError(err);
105+
};
98106
}
99107

100108
// Ignore if the database or user function says we don't need to poll
101109
try {
102-
if (this.db.skipPoll(this.collection, id, op, this.query)) return this._defaultCallback();
103-
if (this.skipPoll(this.collection, id, op, this.query)) return this._defaultCallback();
110+
if (
111+
this.db.skipPoll(this.collection, id, op, this.query) ||
112+
this.skipPoll(this.collection, id, op, this.query)
113+
) {
114+
return pollCallback();
115+
}
104116
} catch (err) {
105-
return this._defaultCallback(err);
117+
return pollCallback(err);
106118
}
107119
if (this.canPollDoc) {
108120
// We can query against only the document that was modified to see if the
109121
// op has changed whether or not it matches the results
110-
this.queryPollDoc(id, this._defaultCallback);
122+
this.queryPollDoc(id, pollCallback);
111123
} else {
112124
// We need to do a full poll of the query, because the query uses limits,
113125
// sorts, or something special
114-
this.queryPoll(this._defaultCallback);
126+
this.queryPoll(pollCallback);
115127
}
116128
};
117129

test/client/query-subscribe.js

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,40 @@ module.exports = function(options) {
8383
});
8484
});
8585

86+
it('subscribed query removes document from results before sending delete op to other clients', function(done) {
87+
var connection1 = this.backend.connect();
88+
var connection2 = this.backend.connect();
89+
var matchAllDbQuery = this.matchAllDbQuery;
90+
async.parallel([
91+
function(cb) {
92+
connection1.get('dogs', 'fido').create({age: 3}, cb);
93+
},
94+
function(cb) {
95+
connection1.get('dogs', 'spot').create({age: 5}, cb);
96+
}
97+
], function(err) {
98+
if (err) return done(err);
99+
var query = connection2.createSubscribeQuery('dogs', matchAllDbQuery, null, function(err) {
100+
if (err) return done(err);
101+
connection1.get('dogs', 'fido').del();
102+
});
103+
var removed = false;
104+
connection2.get('dogs', 'fido').on('del', function() {
105+
expect(removed).equal(true);
106+
done();
107+
});
108+
query.on('remove', function(docs, index) {
109+
removed = true;
110+
expect(util.pluck(docs, 'id')).eql(['fido']);
111+
expect(util.pluck(docs, 'data')).eql([{age: 3}]);
112+
expect(index).a('number');
113+
var results = util.sortById(query.results);
114+
expect(util.pluck(results, 'id')).eql(['spot']);
115+
expect(util.pluck(results, 'data')).eql([{age: 5}]);
116+
});
117+
});
118+
});
119+
86120
it('subscribed query does not get updated after destroyed', function(done) {
87121
var connection = this.backend.connect();
88122
var connection2 = this.backend.connect();

0 commit comments

Comments
 (0)