From 92bd2700061e2cd039da40d4a35ab4c8fb90efb1 Mon Sep 17 00:00:00 2001 From: Daniel Meyer <8926560+pubkey@users.noreply.github.com> Date: Mon, 30 Dec 2024 13:05:26 +0100 Subject: [PATCH] FIX handle conflict resolution schema errors (#6694) * FIX handle conflict resolution schema errors * FIX tests * FIX lint --- docs-src/docs/releases/16.0.0.md | 1 + src/plugins/test-utils/humans-collection.ts | 8 ++- src/plugins/test-utils/schemas.ts | 2 +- src/replication-protocol/downstream.ts | 3 +- src/replication-protocol/upstream.ts | 31 +++++++++--- test/unit/migration-schema.test.ts | 12 ++++- test/unit/replication.test.ts | 55 ++++++++++++++++++++- 7 files changed, 99 insertions(+), 13 deletions(-) diff --git a/docs-src/docs/releases/16.0.0.md b/docs-src/docs/releases/16.0.0.md index 2b0d8da6cad..c1158d8468a 100644 --- a/docs-src/docs/releases/16.0.0.md +++ b/docs-src/docs/releases/16.0.0.md @@ -78,6 +78,7 @@ I completely rewrote them and improved performance especially on initial load wh - Fix IndexedDB bug: Some people had problems with the IndexedDB RxStorage that opened up collections very slowly. If you had this problem, please try out this new version. - Add check to ensure remote instances are build with the same RxDB version. This is to ensure if you update RxDB and forget to rebuild your workers, it will throw instead of causing strange problems. - When the pulled documents in the replication do not match the schema, do not update the checkpoint. +- When the pushed document conflict results in the replication do not match the schema, do not update the checkpoint. ## Other diff --git a/src/plugins/test-utils/humans-collection.ts b/src/plugins/test-utils/humans-collection.ts index b2a878c5a43..355f94c6fb7 100644 --- a/src/plugins/test-utils/humans-collection.ts +++ b/src/plugins/test-utils/humans-collection.ts @@ -12,7 +12,8 @@ import { randomToken, MigrationStrategies, RxAttachmentCreator, - RxStorage + RxStorage, + RxConflictHandler } from '../../index.ts'; import { HumanDocumentType } from './schemas.ts'; @@ -342,7 +343,9 @@ export async function createHumanWithTimestamp( amount = 0, databaseName = randomToken(10), multiInstance = true, - storage = getConfig().storage.getStorage() + storage = getConfig().storage.getStorage(), + conflictHandler?: RxConflictHandler + ): Promise> { const db = await createRxDatabase<{ humans: RxCollection; }>({ @@ -355,6 +358,7 @@ export async function createHumanWithTimestamp( // setTimeout(() => db.close(), dbLifetime); const collections = await db.addCollections({ humans: { + conflictHandler, schema: schemas.humanWithTimestamp } }); diff --git a/src/plugins/test-utils/schemas.ts b/src/plugins/test-utils/schemas.ts index 53c92b64024..a8e0fa9cfe2 100644 --- a/src/plugins/test-utils/schemas.ts +++ b/src/plugins/test-utils/schemas.ts @@ -157,7 +157,7 @@ export const simpleHumanV3: RxJsonSchema = overwritab title: 'human schema', version: 3, keyCompression: false, - description: 'describes a simple human being', + description: 'describes a simple human being (V3 with age as number)', type: 'object', primaryKey: 'passportId', properties: { diff --git a/src/replication-protocol/downstream.ts b/src/replication-protocol/downstream.ts index 4386bea2c2a..865c982f13e 100644 --- a/src/replication-protocol/downstream.ts +++ b/src/replication-protocol/downstream.ts @@ -487,8 +487,7 @@ export async function startReplicationDownstream( conflictWriteFork, 'replication-up-write-conflict' ); - /** - * Errors in the forkWriteResult must not be handled - * because they have been caused by a write to the forkInstance - * in between which will anyway trigger a new upstream cycle - * that will then resolved the conflict again. - */ + + let mustThrow: RxError | undefined; + forkWriteResult.error.forEach(error => { + /** + * Conflict-Errors in the forkWriteResult must not be handled + * because they have been caused by a write to the forkInstance + * in between which will anyway trigger a new upstream cycle + * that will then resolved the conflict again. + */ + if (error.status === 409) { + return; + } + // other non-conflict errors must be handled + const throwMe = newRxError('RC_PUSH', { + writeError: error + }); + state.events.error.next(throwMe); + mustThrow = throwMe; + }); + if (mustThrow) { + throw mustThrow; + } + const useMetaWrites: BulkWriteRow>[] = []; const success = getWrittenDocumentsFromBulkWriteResponse( state.primaryPath, diff --git a/test/unit/migration-schema.test.ts b/test/unit/migration-schema.test.ts index a8f1120c090..52a361e007d 100644 --- a/test/unit/migration-schema.test.ts +++ b/test/unit/migration-schema.test.ts @@ -740,7 +740,17 @@ describe('migration-schema.test.ts', function () { handler: helper.masterChangesSince }, push: { - handler: helper.masterWrite + handler(rows) { + rows = rows.map(row => { + if (row.assumedMasterState) { + row.assumedMasterState.age = row.assumedMasterState.age + ''; + } + row.newDocumentState.age = row.newDocumentState.age + ''; + return row; + }); + const result = helper.masterWrite(rows); + return result; + } } }); ensureReplicationHasNoErrors(replicationState2); diff --git a/test/unit/replication.test.ts b/test/unit/replication.test.ts index 811a9650769..17b33091ca8 100644 --- a/test/unit/replication.test.ts +++ b/test/unit/replication.test.ts @@ -43,7 +43,8 @@ import { requestIdlePromise, prepareQuery, addRxPlugin, - getLastCheckpointDoc + getLastCheckpointDoc, + defaultConflictHandler } from '../../plugins/core/index.mjs'; import { @@ -401,6 +402,58 @@ describe('replication.test.ts', () => { remoteCollection.database.close(); otherSchemaCollection.database.close(); }); + it('should not update the push checkpoint when the conflict handler returns invalid documents that do not match the schema', async () => { + const localCollection = await humansCollection.createHumanWithTimestamp(0, randomToken(10), false, undefined, { + isEqual: defaultConflictHandler.isEqual, + resolve(i) { + const ret = clone(i.realMasterState); + ret.additionalField = 'foobar'; + return ret; + } + }); + const remoteCollection = await humansCollection.createHumanWithTimestamp(0, randomToken(10), false); + + // add one that conflicts + await Promise.all([localCollection, remoteCollection].map((c, i) => c.insert({ + id: 'conflicting-doc', + name: 'myname', + updatedAt: 1001, + age: i + }))); + + const replicationState = replicateRxCollection({ + collection: localCollection as any, + replicationIdentifier: REPLICATION_IDENTIFIER_TEST, + live: true, + pull: { + handler: getPullHandler(remoteCollection) + }, + push: { + handler: getPushHandler(remoteCollection) + }, + retryTime: 100 + }); + const errors: (RxError | RxTypeError)[] = []; + replicationState.error$.subscribe(err => errors.push(err)); + await replicationState.awaitInitialReplication(); + await replicationState.awaitInSync(); + + await wait(isFastMode() ? 0 : 100); + + assert.ok(errors.length > 0); + assert.ok(JSON.stringify(errors[0].parameters).includes('additionalField')); + + // when handling the push failed, it should not have the checkpoint updated + const pushCheckpointAfter = await getLastCheckpointDoc( + ensureNotFalsy(replicationState.internalReplicationState), + 'up' + ); + assert.ok(!pushCheckpointAfter); + + + localCollection.database.close(); + remoteCollection.database.close(); + }); it('should never resolve awaitInitialReplication() on erroring replication', async () => { const { localCollection, remoteCollection } = await getTestCollections({ local: 10, remote: 10 }); const replicationState = replicateRxCollection({