Skip to content

Commit 4164c41

Browse files
committed
Try testing sync with mock fetch
1 parent 7e01ad2 commit 4164c41

File tree

8 files changed

+547
-508
lines changed

8 files changed

+547
-508
lines changed

packages/common/src/client/AbstractPowerSyncDatabase.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import {
3232
type PowerSyncConnectionOptions,
3333
type RequiredAdditionalConnectionOptions
3434
} from './sync/stream/AbstractStreamingSyncImplementation.js';
35-
import { FULL_SYNC_PRIORITY } from 'src/db/crud/SyncProgress.js';
35+
import { FULL_SYNC_PRIORITY } from '../db/crud/SyncProgress.js';
3636

3737
export interface DisconnectAndClearOptions {
3838
/** When set to false, data in local-only tables is preserved. */

packages/node/package.json

+2-1
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,10 @@
5454
"comlink": "^4.4.2"
5555
},
5656
"devDependencies": {
57+
"@powersync/drizzle-driver": "workspace:*",
5758
"@types/async-lock": "^1.4.0",
5859
"drizzle-orm": "^0.35.2",
59-
"@powersync/drizzle-driver": "workspace:*",
60+
"fetch-mock": "^12.5.2",
6061
"rollup": "4.14.3",
6162
"typescript": "^5.5.3",
6263
"vitest": "^3.0.5"

packages/node/src/db/PowerSyncDatabase.ts

+13-1
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
import {
22
AbstractPowerSyncDatabase,
3+
AbstractRemoteOptions,
34
AbstractStreamingSyncImplementation,
45
BucketStorageAdapter,
56
DBAdapter,
7+
DEFAULT_REMOTE_LOGGER,
68
PowerSyncBackendConnector,
79
PowerSyncDatabaseOptions,
810
PowerSyncDatabaseOptionsWithSettings,
@@ -18,6 +20,12 @@ import { NodeSQLOpenOptions } from './options.js';
1820

1921
export type NodePowerSyncDatabaseOptions = PowerSyncDatabaseOptions & {
2022
database: DBAdapter | SQLOpenFactory | NodeSQLOpenOptions;
23+
/**
24+
* Options to override how the SDK will connect to the sync service.
25+
*
26+
* This option is intended to be used for internal tests.
27+
*/
28+
remoteOptions?: Partial<AbstractRemoteOptions>,
2129
};
2230

2331
/**
@@ -57,7 +65,11 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
5765
protected generateSyncStreamImplementation(
5866
connector: PowerSyncBackendConnector
5967
): AbstractStreamingSyncImplementation {
60-
const remote = new NodeRemote(connector);
68+
const remote = new NodeRemote(
69+
connector,
70+
DEFAULT_REMOTE_LOGGER,
71+
(this.options as NodePowerSyncDatabaseOptions).remoteOptions,
72+
);
6173

6274
return new NodeStreamingSyncImplementation({
6375
adapter: this.bucketStorageAdapter,

packages/node/src/sync/stream/NodeRemote.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ export class NodeRemote extends AbstractRemote {
2828
options?: Partial<AbstractRemoteOptions>
2929
) {
3030
super(connector, logger, {
31+
fetchImplementation: options?.fetchImplementation ?? new NodeFetchProvider(),
3132
...(options ?? {}),
32-
fetchImplementation: options?.fetchImplementation ?? new NodeFetchProvider()
3333
});
3434
}
3535

packages/node/tests/PowerSyncDatabase.test.ts

-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import * as path from 'node:path';
2-
import * as fs from 'node:fs/promises';
32
import { Worker } from 'node:worker_threads';
43

54
import { vi, expect, test } from 'vitest';

packages/node/tests/sync.test.ts

+102
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
import { describe, vi, expect, beforeEach } from 'vitest';
2+
3+
import { connectedDatabaseTest, MockSyncService, TestConnector, waitForSyncStatus } from "./utils";
4+
import { AbstractPowerSyncDatabase, BucketChecksum, OplogEntryJSON } from '@powersync/common';
5+
6+
describe('Sync', () => {
7+
describe('reports progress', () => {
8+
let lastOpId = 0;
9+
10+
beforeEach(() => {
11+
lastOpId = 0;
12+
});
13+
14+
function pushDataLine(service: MockSyncService, bucket: string, amount: number) {
15+
const data: OplogEntryJSON[] = [];
16+
for (let i = 0; i < amount; i++) {
17+
data.push({
18+
op_id: `${++lastOpId}`,
19+
op: 'PUT',
20+
object_type: bucket,
21+
object_id: `${lastOpId}`,
22+
checksum: 0,
23+
data: '{}',
24+
});
25+
}
26+
27+
service.pushLine({data: {
28+
bucket,
29+
data,
30+
}});
31+
}
32+
33+
function pushCheckpointComplete(service: MockSyncService, priority?: number) {
34+
if (priority != null) {
35+
service.pushLine({
36+
partial_checkpoint_complete: {
37+
last_op_id: `${lastOpId}`,
38+
priority,
39+
},
40+
});
41+
} else {
42+
service.pushLine({
43+
checkpoint_complete: {
44+
last_op_id: `${lastOpId}`,
45+
},
46+
});
47+
}
48+
}
49+
50+
connectedDatabaseTest('without priorities', async ({database, syncService}) => {
51+
await database.connect(new TestConnector());
52+
await vi.waitFor(() => expect(syncService.connectedListeners).toBe(1));
53+
54+
syncService.pushLine({checkpoint: {
55+
last_op_id: '10',
56+
buckets: [bucket('a', 10)]
57+
}});
58+
59+
await waitForProgress(database, [0, 10]);
60+
61+
pushDataLine(syncService, 'a', 10);
62+
await waitForProgress(database, [10, 10]);
63+
64+
pushCheckpointComplete(syncService);
65+
await waitForSyncStatus(database, (s) => s.downloadProgress == null);
66+
67+
// Emit new data, progress should be 0/2 instead of 10/12
68+
syncService.pushLine({checkpoint_diff: {
69+
last_op_id: '12',
70+
updated_buckets: [bucket('a', 12)],
71+
removed_buckets: [],
72+
write_checkpoint: '',
73+
}});
74+
await waitForProgress(database, [0, 2]);
75+
pushDataLine(syncService, 'a', 2);
76+
await waitForProgress(database, [2, 2]);
77+
pushCheckpointComplete(syncService);
78+
await waitForSyncStatus(database, (s) => s.downloadProgress == null);
79+
});
80+
});
81+
});
82+
83+
function bucket(name: string, count: number, priority: number = 3): BucketChecksum {
84+
return {
85+
bucket: name,
86+
count,
87+
checksum: 0,
88+
priority,
89+
};
90+
}
91+
92+
async function waitForProgress(database: AbstractPowerSyncDatabase, total: [number, number]) {
93+
await waitForSyncStatus(database, (status) => {
94+
const progress = status.downloadProgress;
95+
if (!progress) {
96+
return false;
97+
}
98+
99+
const untilCompletion = progress.untilCompletion;
100+
return untilCompletion.completed == total[0] && untilCompletion.total == total[1];
101+
});
102+
}

packages/node/tests/utils.ts

+105-4
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
import os from 'node:os';
22
import fs from 'node:fs/promises';
33
import path from 'node:path';
4+
import { defaultFetchMockConfig, FetchMock } from 'fetch-mock';
45
import { test } from 'vitest';
5-
import { column, PowerSyncDatabase, Schema, Table } from '../lib';
6+
import { AbstractPowerSyncDatabase, AbstractRemoteOptions, column, NodePowerSyncDatabaseOptions, PowerSyncBackendConnector, PowerSyncCredentials, PowerSyncDatabase, Schema, StreamingSyncLine, SyncStatus, Table } from '../lib';
67

78
export async function createTempDir() {
89
const ostmpdir = os.tmpdir();
@@ -37,15 +38,115 @@ export const tempDirectoryTest = test.extend<{ tmpdir: string }>({
3738
}
3839
});
3940

40-
export const databaseTest = tempDirectoryTest.extend<{ database: PowerSyncDatabase }>({
41-
database: async ({ tmpdir }, use) => {
41+
function createDatabaseFixture(options: Partial<NodePowerSyncDatabaseOptions> = {}) {
42+
return async ({ tmpdir }, use) => {
4243
const database = new PowerSyncDatabase({
44+
...options,
4345
schema: AppSchema,
4446
database: {
4547
dbFilename: 'test.db',
4648
dbLocation: tmpdir
4749
}
4850
});
4951
await use(database);
50-
}
52+
await database.close();
53+
};
54+
}
55+
56+
export const databaseTest = tempDirectoryTest.extend<{ database: PowerSyncDatabase }>({
57+
database: async ({tmpdir}, use) => {
58+
await createDatabaseFixture()({tmpdir}, use);
59+
},
60+
});
61+
62+
// TODO: Unify this with the test setup for the web SDK.
63+
export const mockSyncServiceTest = tempDirectoryTest.extend<{syncService: MockSyncService}>({
64+
syncService: async ({}, use) => {
65+
// Don't install global fetch mocks, we want tests to be isolated!
66+
const fetchMock = new FetchMock(defaultFetchMockConfig);
67+
const listeners: ReadableStreamDefaultController<StreamingSyncLine>[] = [];
68+
69+
fetchMock.route('path:/sync/stream', async () => {
70+
let thisController: ReadableStreamDefaultController<StreamingSyncLine> | null = null;
71+
72+
const syncLines = new ReadableStream<StreamingSyncLine>({
73+
start(controller) {
74+
thisController = controller;
75+
listeners.push(controller);
76+
},
77+
cancel() {
78+
listeners.splice(listeners.indexOf(thisController!), 1);
79+
},
80+
});
81+
82+
83+
const encoder = new TextEncoder();
84+
const asLines = new TransformStream<StreamingSyncLine, Uint8Array>({
85+
transform: (chunk, controller) => {
86+
const line = `${JSON.stringify(chunk)}\n`;
87+
controller.enqueue(encoder.encode(line));
88+
},
89+
});
90+
91+
return new Response(syncLines.pipeThrough(asLines), {status: 200});
92+
});
93+
fetchMock.catch(404);
94+
95+
await use({
96+
clientOptions: {
97+
fetchImplementation: fetchMock.fetchHandler.bind(fetchMock),
98+
},
99+
get connectedListeners() {
100+
return listeners.length;
101+
},
102+
pushLine(line) {
103+
for (const listener of listeners) {
104+
listener.enqueue(line);
105+
}
106+
},
107+
});
108+
},
109+
});
110+
111+
export const connectedDatabaseTest = mockSyncServiceTest.extend<{ database: PowerSyncDatabase }>({
112+
database: async ({ tmpdir, syncService }, use) => {
113+
const fixture = createDatabaseFixture({remoteOptions: syncService.clientOptions});
114+
await fixture({ tmpdir }, use);
115+
},
51116
});
117+
118+
export interface MockSyncService {
119+
clientOptions: Partial<AbstractRemoteOptions>,
120+
pushLine: (line: StreamingSyncLine) => void,
121+
connectedListeners: number,
122+
}
123+
124+
export class TestConnector implements PowerSyncBackendConnector {
125+
async fetchCredentials(): Promise<PowerSyncCredentials> {
126+
return {
127+
endpoint: '',
128+
token: ''
129+
};
130+
}
131+
async uploadData(database: AbstractPowerSyncDatabase): Promise<void> {
132+
const tx = await database.getNextCrudTransaction();
133+
await tx?.complete();
134+
}
135+
}
136+
137+
export function waitForSyncStatus(database: AbstractPowerSyncDatabase, matcher: (status: SyncStatus) => boolean): Promise<void> {
138+
return new Promise((resolve) => {
139+
if (matcher(database.currentStatus)) {
140+
return resolve();
141+
}
142+
143+
const dispose = database.registerListener({
144+
statusChanged: (status) => {
145+
if (matcher(status)) {
146+
dispose();
147+
resolve();
148+
}
149+
}
150+
});
151+
});
152+
}

0 commit comments

Comments
 (0)