Skip to content

Commit

Permalink
FIX handle conflict resolution schema errors (#6694)
Browse files Browse the repository at this point in the history
* FIX handle conflict resolution schema errors

* FIX tests

* FIX lint
  • Loading branch information
pubkey authored Dec 30, 2024
1 parent 89e991f commit 92bd270
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 13 deletions.
1 change: 1 addition & 0 deletions docs-src/docs/releases/16.0.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ I completely rewrote them and improved performance especially on initial load wh
- Fix IndexedDB bug: Some people had problems with the IndexedDB RxStorage that opened up collections very slowly. If you had this problem, please try out this new version.
- Add check to ensure remote instances are build with the same RxDB version. This is to ensure if you update RxDB and forget to rebuild your workers, it will throw instead of causing strange problems.
- When the pulled documents in the replication do not match the schema, do not update the checkpoint.
- When the pushed document conflict results in the replication do not match the schema, do not update the checkpoint.

## Other

Expand Down
8 changes: 6 additions & 2 deletions src/plugins/test-utils/humans-collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import {
randomToken,
MigrationStrategies,
RxAttachmentCreator,
RxStorage
RxStorage,
RxConflictHandler
} from '../../index.ts';

import { HumanDocumentType } from './schemas.ts';
Expand Down Expand Up @@ -342,7 +343,9 @@ export async function createHumanWithTimestamp(
amount = 0,
databaseName = randomToken(10),
multiInstance = true,
storage = getConfig().storage.getStorage()
storage = getConfig().storage.getStorage(),
conflictHandler?: RxConflictHandler<any>

): Promise<RxCollection<schemaObjects.HumanWithTimestampDocumentType>> {

const db = await createRxDatabase<{ humans: RxCollection<schemaObjects.HumanWithTimestampDocumentType>; }>({
Expand All @@ -355,6 +358,7 @@ export async function createHumanWithTimestamp(
// setTimeout(() => db.close(), dbLifetime);
const collections = await db.addCollections({
humans: {
conflictHandler,
schema: schemas.humanWithTimestamp
}
});
Expand Down
2 changes: 1 addition & 1 deletion src/plugins/test-utils/schemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ export const simpleHumanV3: RxJsonSchema<SimpleHumanV3DocumentType> = overwritab
title: 'human schema',
version: 3,
keyCompression: false,
description: 'describes a simple human being',
description: 'describes a simple human being (V3 with age as number)',
type: 'object',
primaryKey: 'passportId',
properties: {
Expand Down
3 changes: 1 addition & 2 deletions src/replication-protocol/downstream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -487,8 +487,7 @@ export async function startReplicationDownstream<RxDocType, CheckpointType = any
writeError: error
});
state.events.error.next(throwMe);
mustThrow =
mustThrow = throwMe;
mustThrow = throwMe;
});
if (mustThrow) {
throw mustThrow;
Expand Down
31 changes: 25 additions & 6 deletions src/replication-protocol/upstream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import type {
ById,
EventBulk,
RxDocumentData,
RxError,
RxReplicationWriteToMasterRow,
RxStorageChangeEvent,
RxStorageInstanceReplicationState,
Expand Down Expand Up @@ -40,6 +41,7 @@ import {
getMetaWriteRow
} from './meta-instance.ts';
import { fillWriteDataForAttachmentsChange } from '../plugins/attachments/index.ts';
import { newRxError } from '../rx-error.ts';

/**
* Writes all document changes from the fork to the master.
Expand Down Expand Up @@ -472,12 +474,29 @@ export async function startReplicationUpstream<RxDocType, CheckpointType>(
conflictWriteFork,
'replication-up-write-conflict'
);
/**
* Errors in the forkWriteResult must not be handled
* because they have been caused by a write to the forkInstance
* in between which will anyway trigger a new upstream cycle
* that will then resolved the conflict again.
*/

let mustThrow: RxError | undefined;
forkWriteResult.error.forEach(error => {
/**
* Conflict-Errors in the forkWriteResult must not be handled
* because they have been caused by a write to the forkInstance
* in between which will anyway trigger a new upstream cycle
* that will then resolved the conflict again.
*/
if (error.status === 409) {
return;
}
// other non-conflict errors must be handled
const throwMe = newRxError('RC_PUSH', {
writeError: error
});
state.events.error.next(throwMe);
mustThrow = throwMe;
});
if (mustThrow) {
throw mustThrow;
}

const useMetaWrites: BulkWriteRow<RxStorageReplicationMeta<RxDocType, any>>[] = [];
const success = getWrittenDocumentsFromBulkWriteResponse(
state.primaryPath,
Expand Down
12 changes: 11 additions & 1 deletion test/unit/migration-schema.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,17 @@ describe('migration-schema.test.ts', function () {
handler: helper.masterChangesSince
},
push: {
handler: helper.masterWrite
handler(rows) {
rows = rows.map(row => {
if (row.assumedMasterState) {
row.assumedMasterState.age = row.assumedMasterState.age + '';
}
row.newDocumentState.age = row.newDocumentState.age + '';
return row;
});
const result = helper.masterWrite(rows);
return result;
}
}
});
ensureReplicationHasNoErrors(replicationState2);
Expand Down
55 changes: 54 additions & 1 deletion test/unit/replication.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ import {
requestIdlePromise,
prepareQuery,
addRxPlugin,
getLastCheckpointDoc
getLastCheckpointDoc,
defaultConflictHandler
} from '../../plugins/core/index.mjs';

import {
Expand Down Expand Up @@ -401,6 +402,58 @@ describe('replication.test.ts', () => {
remoteCollection.database.close();
otherSchemaCollection.database.close();
});
it('should not update the push checkpoint when the conflict handler returns invalid documents that do not match the schema', async () => {
const localCollection = await humansCollection.createHumanWithTimestamp(0, randomToken(10), false, undefined, {
isEqual: defaultConflictHandler.isEqual,
resolve(i) {
const ret = clone(i.realMasterState);
ret.additionalField = 'foobar';
return ret;
}
});
const remoteCollection = await humansCollection.createHumanWithTimestamp(0, randomToken(10), false);

// add one that conflicts
await Promise.all([localCollection, remoteCollection].map((c, i) => c.insert({
id: 'conflicting-doc',
name: 'myname',
updatedAt: 1001,
age: i
})));

const replicationState = replicateRxCollection({
collection: localCollection as any,
replicationIdentifier: REPLICATION_IDENTIFIER_TEST,
live: true,
pull: {
handler: getPullHandler(remoteCollection)
},
push: {
handler: getPushHandler(remoteCollection)
},
retryTime: 100
});
const errors: (RxError | RxTypeError)[] = [];
replicationState.error$.subscribe(err => errors.push(err));
await replicationState.awaitInitialReplication();
await replicationState.awaitInSync();

await wait(isFastMode() ? 0 : 100);

assert.ok(errors.length > 0);
assert.ok(JSON.stringify(errors[0].parameters).includes('additionalField'));

// when handling the push failed, it should not have the checkpoint updated
const pushCheckpointAfter = await getLastCheckpointDoc(
ensureNotFalsy(replicationState.internalReplicationState),
'up'
);
assert.ok(!pushCheckpointAfter);


localCollection.database.close();
remoteCollection.database.close();
});
it('should never resolve awaitInitialReplication() on erroring replication', async () => {
const { localCollection, remoteCollection } = await getTestCollections({ local: 10, remote: 10 });
const replicationState = replicateRxCollection({
Expand Down

0 comments on commit 92bd270

Please sign in to comment.