Skip to content

Commit

Permalink
STITCH-1986 - Add watch to remote mongo collection (#98)
Browse files Browse the repository at this point in the history
  • Loading branch information
dkaminsky authored Feb 26, 2019
1 parent 59230df commit 9d6307d
Show file tree
Hide file tree
Showing 48 changed files with 1,885 additions and 850 deletions.
3 changes: 1 addition & 2 deletions .evg.yml
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ tasks:
cd stitch-java-sdk
echo "running android tests"
echo "test.stitch.baseURL=http://10.0.2.2:9090" >> local.properties
./gradlew connectedAndroidTest --info --continue --warning-mode=all --stacktrace < /dev/null
./gradlew connectedAndroidTest jacocoTestReport --info --continue --warning-mode=all --stacktrace < /dev/null
- func: "publish_coveralls"

- name: run_android_tests_with_proguard
Expand Down Expand Up @@ -371,7 +371,6 @@ tasks:
echo "running android tests"
echo "test.stitch.baseURL=http://10.0.2.2:9090" >> local.properties
./gradlew connectedAndroidTest -PwithProguardMinification --info --continue --warning-mode=all --stacktrace < /dev/null
- func: "publish_coveralls"
- name: finalize_coverage
depends_on:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@
import com.mongodb.stitch.android.services.mongodb.remote.RemoteMongoCollection;
import com.mongodb.stitch.core.auth.providers.serverapikey.ServerApiKeyCredential;
import com.mongodb.stitch.core.internal.common.BsonUtils;
import com.mongodb.stitch.core.services.mongodb.remote.ChangeEvent;
import com.mongodb.stitch.core.services.mongodb.remote.OperationType;
import com.mongodb.stitch.core.services.mongodb.remote.sync.ChangeEventListener;
import com.mongodb.stitch.core.services.mongodb.remote.sync.DefaultSyncConflictResolvers;
import com.mongodb.stitch.core.services.mongodb.remote.sync.SyncDeleteResult;
import com.mongodb.stitch.core.services.mongodb.remote.sync.internal.ChangeEvent;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -236,7 +237,7 @@ public void onEvent(final BsonValue documentId, final ChangeEvent<Document> even
private class ItemUpdateListener implements ChangeEventListener<TodoItem> {
@Override
public void onEvent(final BsonValue documentId, final ChangeEvent<TodoItem> event) {
if (event.getOperationType() == ChangeEvent.OperationType.DELETE) {
if (event.getOperationType() == OperationType.DELETE) {
todoAdapter.removeItemById(event.getDocumentKey().getObjectId("_id").getValue());
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,13 @@ import com.mongodb.stitch.core.admin.services.ServiceConfigs
import com.mongodb.stitch.core.admin.services.rules.RuleCreator
import com.mongodb.stitch.core.auth.providers.anonymous.AnonymousCredential
import com.mongodb.stitch.core.internal.common.BsonUtils
import com.mongodb.stitch.core.services.mongodb.remote.OperationType
import com.mongodb.stitch.core.services.mongodb.remote.RemoteCountOptions
import com.mongodb.stitch.core.services.mongodb.remote.RemoteUpdateOptions
import com.mongodb.stitch.core.testutils.CustomType
import org.bson.BsonDocument
import org.bson.BsonInt32
import org.bson.BsonString
import org.bson.Document
import org.bson.codecs.configuration.CodecConfigurationException
import org.bson.codecs.configuration.CodecRegistries
Expand Down Expand Up @@ -69,7 +73,7 @@ class RemoteMongoClientIntTests : BaseStitchAndroidIntTest() {
roles = listOf(RuleCreator.MongoDb.Role(
read = true, write = true
)),
schema = RuleCreator.MongoDb.Schema())
schema = RuleCreator.MongoDb.Schema().copy(properties = Document()))

addRule(svc.second, rule)

Expand Down Expand Up @@ -431,6 +435,95 @@ class RemoteMongoClientIntTests : BaseStitchAndroidIntTest() {
assertEquals(expected, Tasks.await(Tasks.await(iter.iterator()).next()))
}

@Test
fun testWatchBsonValueIDs() {
val coll = getTestColl()
assertEquals(0, Tasks.await(coll.count()))

val rawDoc1 = Document()
rawDoc1["_id"] = 1
rawDoc1["hello"] = "world"

val rawDoc2 = Document()
rawDoc2["_id"] = "foo"
rawDoc2["happy"] = "day"

Tasks.await(coll.insertOne(rawDoc1))
assertEquals(1, Tasks.await(coll.count()))

val streamTask = coll.watch(BsonInt32(1), BsonString("foo"))
val stream = Tasks.await(streamTask)

try {
Tasks.await(coll.insertOne(rawDoc2))
assertEquals(2, Tasks.await(coll.count()))
Tasks.await(coll.updateMany(BsonDocument(), Document().append("\$set",
Document().append("new", "field"))))

val insertEvent = Tasks.await(stream.nextEvent())
assertEquals(OperationType.INSERT, insertEvent.operationType)
assertEquals(rawDoc2, insertEvent.fullDocument)
val updateEvent1 = Tasks.await(stream.nextEvent())
val updateEvent2 = Tasks.await(stream.nextEvent())

assertNotNull(updateEvent1)
assertNotNull(updateEvent2)

assertEquals(OperationType.UPDATE, updateEvent1.operationType)
assertEquals(rawDoc1.append("new", "field"), updateEvent1.fullDocument)
assertEquals(OperationType.UPDATE, updateEvent2.operationType)
assertEquals(rawDoc2.append("new", "field"), updateEvent2.fullDocument)
} finally {
stream.close()
}
}

@Test
fun testWatchObjectIdIDs() {
val coll = getTestColl()
assertEquals(0, Tasks.await(coll.count()))

val objectId1 = ObjectId()
val objectId2 = ObjectId()

val rawDoc1 = Document()
rawDoc1["_id"] = objectId1
rawDoc1["hello"] = "world"

val rawDoc2 = Document()
rawDoc2["_id"] = objectId2
rawDoc2["happy"] = "day"

Tasks.await(coll.insertOne(rawDoc1))
assertEquals(1, Tasks.await(coll.count()))

val streamTask = coll.watch(objectId1, objectId2)
val stream = Tasks.await(streamTask)

try {
Tasks.await(coll.insertOne(rawDoc2))
assertEquals(2, Tasks.await(coll.count()))
Tasks.await(coll.updateMany(BsonDocument(), Document().append("\$set",
Document().append("new", "field"))))

val insertEvent = Tasks.await(stream.nextEvent())
assertEquals(OperationType.INSERT, insertEvent.operationType)
assertEquals(rawDoc2, insertEvent.fullDocument)
val updateEvent1 = Tasks.await(stream.nextEvent())
val updateEvent2 = Tasks.await(stream.nextEvent())

assertNotNull(updateEvent1)
assertNotNull(updateEvent2)

assertEquals(OperationType.UPDATE, updateEvent1.operationType)
assertEquals(rawDoc1.append("new", "field"), updateEvent1.fullDocument)
assertEquals(OperationType.UPDATE, updateEvent2.operationType)
assertEquals(rawDoc2.append("new", "field"), updateEvent2.fullDocument)
} finally {
stream.close()
}
}

private fun withoutIds(documents: Collection<Document>): Collection<Document> {
val list = ArrayList<Document>(documents.size)
documents.forEach { list.add(withoutId(it)) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ import com.mongodb.stitch.core.admin.services.ServiceConfigs
import com.mongodb.stitch.core.admin.services.rules.RuleCreator
import com.mongodb.stitch.core.admin.services.rules.RuleResponse
import com.mongodb.stitch.core.auth.providers.anonymous.AnonymousCredential
import com.mongodb.stitch.core.services.mongodb.remote.ExceptionListener
import com.mongodb.stitch.core.services.mongodb.remote.RemoteDeleteResult
import com.mongodb.stitch.core.services.mongodb.remote.RemoteInsertManyResult
import com.mongodb.stitch.core.services.mongodb.remote.RemoteInsertOneResult
import com.mongodb.stitch.core.services.mongodb.remote.RemoteUpdateResult
import com.mongodb.stitch.core.services.mongodb.remote.sync.ChangeEventListener
import com.mongodb.stitch.core.services.mongodb.remote.sync.ConflictHandler
import com.mongodb.stitch.core.services.mongodb.remote.sync.ErrorListener
import com.mongodb.stitch.core.services.mongodb.remote.sync.SyncDeleteResult
import com.mongodb.stitch.core.services.mongodb.remote.sync.SyncInsertManyResult
import com.mongodb.stitch.core.services.mongodb.remote.sync.SyncInsertOneResult
Expand Down Expand Up @@ -71,9 +71,9 @@ class SyncMongoClientIntTests : BaseStitchAndroidIntTest(), SyncIntTestRunner {
override fun configure(
conflictResolver: ConflictHandler<Document?>,
changeEventListener: ChangeEventListener<Document>?,
errorListener: ErrorListener?
exceptionListener: ExceptionListener?
) {
sync.configure(conflictResolver, changeEventListener, errorListener)
sync.configure(conflictResolver, changeEventListener, exceptionListener)
}

override fun syncOne(id: BsonValue) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright 2018-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.mongodb.stitch.android.services.mongodb.remote;

import com.google.android.gms.tasks.Task;

import com.mongodb.stitch.android.core.internal.common.TaskDispatcher;
import com.mongodb.stitch.core.internal.net.StitchEvent;
import com.mongodb.stitch.core.internal.net.Stream;
import com.mongodb.stitch.core.services.mongodb.remote.ChangeEvent;
import com.mongodb.stitch.core.services.mongodb.remote.ChangeStream;

import java.io.IOException;
import java.util.concurrent.Callable;

/**
* An implementation of {@link com.mongodb.stitch.core.services.mongodb.remote.ChangeStream} that
* returns each event as a {@link Task}.
*
* @param <DocumentT> The type of the full document on the underlying change event to be returned
* asynchronously.
*/
public class AsyncChangeStream<DocumentT> extends
ChangeStream<Task<ChangeEvent<DocumentT>>, DocumentT> {
private final TaskDispatcher dispatcher;

/**
* Initializes a passthrough change stream with the provided underlying event stream.
*
* @param stream The event stream.
* @param dispatcher The event dispatcher.
*/
public AsyncChangeStream(final Stream<ChangeEvent<DocumentT>> stream,
final TaskDispatcher dispatcher) {
super(stream);
this.dispatcher = dispatcher;
}

/**
* Returns a {@link Task} whose resolution gives the next event from the underlying stream.
* @return task providing the next event
* @throws IOException if the underlying stream throws an {@link IOException}
*/
@Override
public Task<ChangeEvent<DocumentT>> nextEvent() throws IOException {
return dispatcher.dispatchTask(new Callable<ChangeEvent<DocumentT>>() {
@Override
public ChangeEvent<DocumentT> call() throws Exception {
final StitchEvent<ChangeEvent<DocumentT>> nextEvent = getStream().nextEvent();

if (nextEvent == null) {
return null;
}
if (nextEvent.getError() != null) {
dispatchError(nextEvent);
return null;
}
if (nextEvent.getData() == null) {
return null;
}

return nextEvent.getData();
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,21 @@

import com.google.android.gms.tasks.Task;
import com.mongodb.MongoNamespace;
import com.mongodb.stitch.core.services.mongodb.remote.ChangeEvent;
import com.mongodb.stitch.core.services.mongodb.remote.ChangeStream;
import com.mongodb.stitch.core.services.mongodb.remote.RemoteCountOptions;
import com.mongodb.stitch.core.services.mongodb.remote.RemoteDeleteResult;
import com.mongodb.stitch.core.services.mongodb.remote.RemoteInsertManyResult;
import com.mongodb.stitch.core.services.mongodb.remote.RemoteInsertOneResult;
import com.mongodb.stitch.core.services.mongodb.remote.RemoteUpdateOptions;
import com.mongodb.stitch.core.services.mongodb.remote.RemoteUpdateResult;

import java.util.List;

import org.bson.BsonValue;
import org.bson.codecs.configuration.CodecRegistry;
import org.bson.conversions.Bson;
import org.bson.types.ObjectId;

/**
* The RemoteMongoCollection interface.
Expand Down Expand Up @@ -241,6 +247,22 @@ Task<RemoteUpdateResult> updateMany(
final Bson update,
final RemoteUpdateOptions updateOptions);


/**
* Watches specified IDs in a collection. This convenience overload supports the use case
* of non-{@link BsonValue} instances of {@link ObjectId}.
* @param ids unique object identifiers of the IDs to watch.
* @return the stream of change events.
*/
Task<ChangeStream<Task<ChangeEvent<DocumentT>>, DocumentT>> watch(final ObjectId... ids);

/**
* Watches specified IDs in a collection.
* @param ids the ids to watch.
* @return the stream of change events.
*/
Task<ChangeStream<Task<ChangeEvent<DocumentT>>, DocumentT>> watch(final BsonValue... ids);

/**
* A set of synchronization related operations on this collection.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import android.support.annotation.Nullable;

import com.google.android.gms.tasks.Task;
import com.mongodb.stitch.core.services.mongodb.remote.ExceptionListener;
import com.mongodb.stitch.core.services.mongodb.remote.sync.ChangeEventListener;
import com.mongodb.stitch.core.services.mongodb.remote.sync.ConflictHandler;
import com.mongodb.stitch.core.services.mongodb.remote.sync.ErrorListener;
import com.mongodb.stitch.core.services.mongodb.remote.sync.SyncCountOptions;
import com.mongodb.stitch.core.services.mongodb.remote.sync.SyncDeleteResult;
import com.mongodb.stitch.core.services.mongodb.remote.sync.SyncInsertManyResult;
Expand All @@ -48,11 +48,11 @@ public interface Sync<DocumentT> {
* and remote events.
* @param changeEventListener the event listener to invoke when a change event happens for the
* document.
* @param errorListener the error listener to invoke when an irrecoverable error occurs
* @param exceptionListener the error listener to invoke when an irrecoverable error occurs
*/
void configure(@NonNull final ConflictHandler<DocumentT> conflictHandler,
@Nullable final ChangeEventListener<DocumentT> changeEventListener,
@Nullable final ErrorListener errorListener);
@Nullable final ExceptionListener exceptionListener);

/**
* Requests that the given document _id be synchronized.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,28 @@
import com.google.android.gms.tasks.Task;
import com.mongodb.MongoNamespace;
import com.mongodb.stitch.android.core.internal.common.TaskDispatcher;
import com.mongodb.stitch.android.services.mongodb.remote.AsyncChangeStream;
import com.mongodb.stitch.android.services.mongodb.remote.RemoteAggregateIterable;
import com.mongodb.stitch.android.services.mongodb.remote.RemoteFindIterable;
import com.mongodb.stitch.android.services.mongodb.remote.RemoteMongoCollection;
import com.mongodb.stitch.android.services.mongodb.remote.Sync;
import com.mongodb.stitch.core.services.mongodb.remote.ChangeEvent;
import com.mongodb.stitch.core.services.mongodb.remote.ChangeStream;
import com.mongodb.stitch.core.services.mongodb.remote.RemoteCountOptions;
import com.mongodb.stitch.core.services.mongodb.remote.RemoteDeleteResult;
import com.mongodb.stitch.core.services.mongodb.remote.RemoteInsertManyResult;
import com.mongodb.stitch.core.services.mongodb.remote.RemoteInsertOneResult;
import com.mongodb.stitch.core.services.mongodb.remote.RemoteUpdateOptions;
import com.mongodb.stitch.core.services.mongodb.remote.RemoteUpdateResult;
import com.mongodb.stitch.core.services.mongodb.remote.internal.CoreRemoteMongoCollection;

import java.util.List;
import java.util.concurrent.Callable;

import org.bson.BsonValue;
import org.bson.codecs.configuration.CodecRegistry;
import org.bson.conversions.Bson;
import org.bson.types.ObjectId;

public final class RemoteMongoCollectionImpl<DocumentT>
implements RemoteMongoCollection<DocumentT> {
Expand Down Expand Up @@ -344,6 +351,30 @@ public RemoteUpdateResult call() {
});
}

@Override
public Task<ChangeStream<Task<ChangeEvent<DocumentT>>, DocumentT>> watch(final ObjectId... ids) {
return dispatcher.dispatchTask(
new Callable<ChangeStream<Task<ChangeEvent<DocumentT>>, DocumentT>>() {
@Override
public ChangeStream<Task<ChangeEvent<DocumentT>>, DocumentT> call() throws Exception {
return new AsyncChangeStream<DocumentT>(proxy.watch(ids), dispatcher);
}
}
);
}

@Override
public Task<ChangeStream<Task<ChangeEvent<DocumentT>>, DocumentT>> watch(final BsonValue... ids) {
return dispatcher.dispatchTask(
new Callable<ChangeStream<Task<ChangeEvent<DocumentT>>, DocumentT>>() {
@Override
public ChangeStream<Task<ChangeEvent<DocumentT>>, DocumentT> call() throws Exception {
return new AsyncChangeStream<DocumentT>(proxy.watch(ids), dispatcher);
}
}
);
}

@Override
public Sync<DocumentT> sync() {
return this.sync;
Expand Down
Loading

0 comments on commit 9d6307d

Please sign in to comment.