Skip to content

Commit 7e01ad2

Browse files
committed
Track progress for sync lines
1 parent cf050e9 commit 7e01ad2

File tree

5 files changed

+78
-9
lines changed

5 files changed

+78
-9
lines changed

packages/common/src/client/sync/bucket/BucketStorageAdapter.ts

+6
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@ export interface SyncLocalDatabaseResult {
3030
checkpointFailures?: string[];
3131
}
3232

33+
export type BucketOperationProgress = Record<string, {
34+
atLast: number;
35+
sinceLast: number;
36+
}>;
37+
3338
export interface BucketChecksum {
3439
bucket: string;
3540
priority?: number;
@@ -65,6 +70,7 @@ export interface BucketStorageAdapter extends BaseObserver<BucketStorageListener
6570
startSession(): void;
6671

6772
getBucketStates(): Promise<BucketState[]>;
73+
getBucketOperationProgress(): Promise<BucketOperationProgress>;
6874

6975
syncLocalDatabase(
7076
checkpoint: Checkpoint,

packages/common/src/client/sync/bucket/SqliteBucketStorage.ts

+16-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { BaseObserver } from '../../../utils/BaseObserver.js';
55
import { MAX_OP_ID } from '../../constants.js';
66
import {
77
BucketChecksum,
8+
BucketOperationProgress,
89
BucketState,
910
BucketStorageAdapter,
1011
BucketStorageListener,
@@ -91,6 +92,11 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
9192
return result;
9293
}
9394

95+
async getBucketOperationProgress(): Promise<BucketOperationProgress> {
96+
const rows = await this.db.getAll<{name: string, count_at_last: number, count_since_last: number}>("SELECT name, count_at_last, count_since_last FROM ps_buckets");
97+
return Object.fromEntries(rows.map((r) => [r.name, {atLast: r.count_at_last, sinceLast: r.count_since_last}]));
98+
}
99+
94100
async saveSyncData(batch: SyncDataBatch) {
95101
await this.writeTransaction(async (tx) => {
96102
let count = 0;
@@ -199,7 +205,16 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
199205
'sync_local',
200206
arg
201207
]);
202-
return result == 1;
208+
if (result == 1) {
209+
if (priority == null) {
210+
const bucketToCount = Object.fromEntries(checkpoint.buckets.map((b) => [b.bucket, b.count]));
211+
await tx.execute('UPDATE ps_buckets SET count_since_last = 0, count_at_last = ?1->name WHERE ?1->name IS NOT NULL', [JSON.stringify(bucketToCount)]);
212+
}
213+
214+
return true;
215+
} else {
216+
return false;
217+
}
203218
});
204219
}
205220

packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts

+53-6
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import {
2020
isStreamingSyncData
2121
} from './streaming-sync-types.js';
2222
import { DataStream } from 'src/utils/DataStream.js';
23+
import { InternalProgressInformation } from 'src/db/crud/SyncProgress.js';
2324

2425
export enum LockType {
2526
CRUD = 'crud',
@@ -163,21 +164,23 @@ export abstract class AbstractStreamingSyncImplementation
163164
protected streamingSyncPromise?: Promise<void>;
164165

165166
syncStatus: SyncStatus;
167+
private syncStatusOptions: SyncStatusOptions;
166168
triggerCrudUpload: () => void;
167169

168170
constructor(options: AbstractStreamingSyncImplementationOptions) {
169171
super();
170172
this.options = { ...DEFAULT_STREAMING_SYNC_OPTIONS, ...options };
171173

172-
this.syncStatus = new SyncStatus({
174+
this.syncStatusOptions = {
173175
connected: false,
174176
connecting: false,
175177
lastSyncedAt: undefined,
176178
dataFlow: {
177179
uploading: false,
178180
downloading: false
179181
}
180-
});
182+
};
183+
this.syncStatus = new SyncStatus(this.syncStatusOptions);
181184
this.abortController = null;
182185

183186
this.triggerCrudUpload = throttleLeadingTrailing(() => {
@@ -411,7 +414,8 @@ The next upload iteration will be delayed.`);
411414
connected: false,
412415
connecting: false,
413416
dataFlow: {
414-
downloading: false
417+
downloading: false,
418+
downloadProgress: null,
415419
}
416420
});
417421
});
@@ -569,6 +573,7 @@ The next upload iteration will be delayed.`);
569573
bucketMap = newBuckets;
570574
await this.options.adapter.removeBuckets([...bucketsToDelete]);
571575
await this.options.adapter.setTargetCheckpoint(targetCheckpoint);
576+
await this.updateSyncStatusForStartingCheckpoint(targetCheckpoint);
572577
} else if (isStreamingSyncCheckpointComplete(line)) {
573578
this.logger.debug('Checkpoint complete', targetCheckpoint);
574579
const result = await this.options.adapter.syncLocalDatabase(targetCheckpoint!);
@@ -588,7 +593,8 @@ The next upload iteration will be delayed.`);
588593
connected: true,
589594
lastSyncedAt: new Date(),
590595
dataFlow: {
591-
downloading: false
596+
downloading: false,
597+
downloadProgress: null,
592598
}
593599
});
594600
}
@@ -645,6 +651,7 @@ The next upload iteration will be delayed.`);
645651
write_checkpoint: diff.write_checkpoint
646652
};
647653
targetCheckpoint = newCheckpoint;
654+
await this.updateSyncStatusForStartingCheckpoint(targetCheckpoint);
648655

649656
bucketMap = new Map();
650657
newBuckets.forEach((checksum, name) =>
@@ -662,9 +669,23 @@ The next upload iteration will be delayed.`);
662669
await this.options.adapter.setTargetCheckpoint(targetCheckpoint);
663670
} else if (isStreamingSyncData(line)) {
664671
const { data } = line;
672+
const previousProgress = this.syncStatusOptions.dataFlow?.downloadProgress;
673+
let updatedProgress: InternalProgressInformation | null = null;
674+
if (previousProgress) {
675+
updatedProgress = {...previousProgress};
676+
const progressForBucket = updatedProgress[data.bucket];
677+
if (progressForBucket) {
678+
updatedProgress[data.bucket] = {
679+
...progressForBucket,
680+
sinceLast: progressForBucket.sinceLast + data.data.length,
681+
};
682+
}
683+
}
684+
665685
this.updateSyncStatus({
666686
dataFlow: {
667-
downloading: true
687+
downloading: true,
688+
downloadProgress: updatedProgress,
668689
}
669690
});
670691
await this.options.adapter.saveSyncData({ buckets: [SyncDataBucket.fromRow(data)] });
@@ -707,7 +728,8 @@ The next upload iteration will be delayed.`);
707728
lastSyncedAt: new Date(),
708729
priorityStatusEntries: [],
709730
dataFlow: {
710-
downloading: false
731+
downloading: false,
732+
downloadProgress: null,
711733
}
712734
});
713735
}
@@ -721,6 +743,30 @@ The next upload iteration will be delayed.`);
721743
});
722744
}
723745

746+
private async updateSyncStatusForStartingCheckpoint(checkpoint: Checkpoint) {
747+
const localProgress = await this.options.adapter.getBucketOperationProgress();
748+
const progress: InternalProgressInformation = {};
749+
750+
for (const bucket of checkpoint.buckets) {
751+
const savedProgress = localProgress[bucket.bucket];
752+
progress[bucket.bucket] = {
753+
// The fallback priority doesn't matter here, but 3 is the one newer versions of the sync service
754+
// will use by default.
755+
priority: bucket.priority ?? 3,
756+
atLast: savedProgress?.atLast ?? 0,
757+
sinceLast: savedProgress.sinceLast ?? 0,
758+
targetCount: bucket.count ?? 0,
759+
};
760+
}
761+
762+
this.updateSyncStatus({
763+
dataFlow: {
764+
downloading: true,
765+
downloadProgress: progress,
766+
}
767+
});
768+
}
769+
724770
protected updateSyncStatus(options: SyncStatusOptions) {
725771
const updatedStatus = new SyncStatus({
726772
connected: options.connected ?? this.syncStatus.connected,
@@ -734,6 +780,7 @@ The next upload iteration will be delayed.`);
734780
});
735781

736782
if (!this.syncStatus.isEqual(updatedStatus)) {
783+
this.syncStatusOptions = options;
737784
this.syncStatus = updatedStatus;
738785
// Only trigger this is there was a change
739786
this.iterateListeners((cb) => cb.statusChanged?.(updatedStatus));

packages/common/src/db/crud/SyncProgress.ts

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import type { SyncStatus } from "./SyncStatus.js";
22

33
// (bucket, progress) pairs
4+
/** @internal */
45
export type InternalProgressInformation = Record<string, {
56
priority: number, // Priority of the associated buckets
67
atLast: number, // Total ops at last completed sync, or 0
@@ -9,7 +10,7 @@ export type InternalProgressInformation = Record<string, {
910
}>;
1011

1112
/**
12-
* The priority used by the core extension to indicate that a full sync was completed.
13+
* @internal The priority used by the core extension to indicate that a full sync was completed.
1314
*/
1415
export const FULL_SYNC_PRIORITY = 2147483647;
1516

packages/common/src/db/crud/SyncStatus.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ export type SyncDataFlowStatus = Partial<{
88
*
99
* Please use the {@link SyncStatus#downloadProgress} property to track sync progress.
1010
*/
11-
downloadProgress: InternalProgressInformation,
11+
downloadProgress: InternalProgressInformation | null,
1212
}>;
1313

1414
export interface SyncPriorityStatus {

0 commit comments

Comments
 (0)