diff --git a/CHANGELOG.md b/CHANGELOG.md index 03a76802..9dfaacaa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ * Fixed `CrudBatch` `hasMore` always returning false. * Added `triggerImmediately` to `onChange` method. +* Report real-time progress information about downloads through `SyncStatus.downloadProgress`. +* Compose: Add `composeState()` extension method on `SyncStatus`. ## 1.0.0-BETA32 diff --git a/compose/src/commonMain/kotlin/com/powersync/compose/DatabaseState.kt b/compose/src/commonMain/kotlin/com/powersync/compose/DatabaseState.kt new file mode 100644 index 00000000..74a48ef6 --- /dev/null +++ b/compose/src/commonMain/kotlin/com/powersync/compose/DatabaseState.kt @@ -0,0 +1,10 @@ +package com.powersync.compose + +import androidx.compose.runtime.Composable +import androidx.compose.runtime.State +import androidx.compose.runtime.collectAsState +import com.powersync.sync.SyncStatus +import com.powersync.sync.SyncStatusData + +@Composable +public fun SyncStatus.composeState(): State = asFlow().collectAsState(initial = this) diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt similarity index 100% rename from core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt rename to core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt new file mode 100644 index 00000000..e7fe1d57 --- /dev/null +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt @@ -0,0 +1,343 @@ +package com.powersync.sync + +import app.cash.turbine.ReceiveTurbine +import app.cash.turbine.turbineScope +import com.powersync.bucket.BucketChecksum +import com.powersync.bucket.BucketPriority +import com.powersync.bucket.Checkpoint +import com.powersync.bucket.OpType +import com.powersync.bucket.OplogEntry +import com.powersync.testutils.ActiveDatabaseTest +import com.powersync.testutils.databaseTest +import com.powersync.testutils.waitFor +import kotlinx.coroutines.channels.Channel +import kotlin.test.BeforeTest +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFalse +import kotlin.test.assertNull +import kotlin.test.assertTrue + +class SyncProgressTest { + private var lastOpId = 0 + + @BeforeTest + fun resetOpId() { + lastOpId = 0 + } + + private fun bucket( + name: String, + count: Int, + priority: BucketPriority = BucketPriority(3), + ): BucketChecksum = + BucketChecksum( + bucket = name, + priority = priority, + checksum = 0, + count = count, + ) + + private suspend fun ActiveDatabaseTest.addDataLine( + bucket: String, + amount: Int, + ) { + syncLines.send( + SyncLine.SyncDataBucket( + bucket = bucket, + data = + List(amount) { + OplogEntry( + checksum = 0, + opId = (++lastOpId).toString(), + op = OpType.PUT, + rowId = lastOpId.toString(), + rowType = bucket, + data = "{}", + ) + }, + after = null, + nextAfter = null, + ), + ) + } + + private suspend fun ActiveDatabaseTest.addCheckpointComplete(priority: BucketPriority? = null) { + if (priority != null) { + syncLines.send( + SyncLine.CheckpointPartiallyComplete( + lastOpId = lastOpId.toString(), + priority = priority, + ), + ) + } else { + syncLines.send(SyncLine.CheckpointComplete(lastOpId = lastOpId.toString())) + } + } + + private suspend fun ReceiveTurbine.expectProgress( + total: Pair, + priorities: Map> = emptyMap(), + ) { + val item = awaitItem() + val progress = item.downloadProgress ?: error("Expected download progress on $item") + + assertTrue { item.downloading } + assertEquals(total.first, progress.downloadedOperations) + assertEquals(total.second, progress.totalOperations) + + priorities.forEach { (priority, expected) -> + val (expectedDownloaded, expectedTotal) = expected + val progress = progress.untilPriority(priority) + assertEquals(expectedDownloaded, progress.downloadedOperations) + assertEquals(expectedTotal, progress.totalOperations) + } + } + + private suspend fun ReceiveTurbine.expectNotDownloading() { + awaitItem().also { + assertFalse { it.downloading } + assertNull(it.downloadProgress) + } + } + + @Test + fun withoutPriorities() = + databaseTest { + database.connect(connector) + + turbineScope { + val turbine = database.currentStatus.asFlow().testIn(this) + turbine.waitFor { it.connected && !it.downloading } + + // Send checkpoint with 10 ops, progress should be 0/10 + syncLines.send( + SyncLine.FullCheckpoint( + Checkpoint( + lastOpId = "10", + checksums = listOf(bucket("a", 10)), + ), + ), + ) + turbine.expectProgress(0 to 10) + + addDataLine("a", 10) + turbine.expectProgress(10 to 10) + + addCheckpointComplete() + turbine.expectNotDownloading() + + // Emit new data, progress should be 0/2 instead of 10/12 + syncLines.send( + SyncLine.CheckpointDiff( + lastOpId = "12", + updatedBuckets = listOf(bucket("a", 12)), + removedBuckets = emptyList(), + ), + ) + turbine.expectProgress(0 to 2) + + addDataLine("a", 2) + turbine.expectProgress(2 to 2) + + addCheckpointComplete() + turbine.expectNotDownloading() + + turbine.cancel() + } + + database.close() + syncLines.close() + } + + @Test + fun interruptedSync() = + databaseTest { + database.connect(connector) + + turbineScope { + val turbine = database.currentStatus.asFlow().testIn(this) + turbine.waitFor { it.connected && !it.downloading } + + // Send checkpoint with 10 ops, progress should be 0/10 + syncLines.send( + SyncLine.FullCheckpoint( + Checkpoint( + lastOpId = "10", + checksums = listOf(bucket("a", 10)), + ), + ), + ) + turbine.expectProgress(0 to 10) + + addDataLine("a", 5) + turbine.expectProgress(5 to 10) + + turbine.cancel() + } + + // Emulate the app closing + database.close() + syncLines.close() + + // And reconnecting + database = openDatabase() + syncLines = Channel() + database.connect(connector) + + turbineScope { + val turbine = database.currentStatus.asFlow().testIn(this) + turbine.waitFor { it.connected && !it.downloading } + + // Send the same checkpoint as before + syncLines.send( + SyncLine.FullCheckpoint( + Checkpoint( + lastOpId = "10", + checksums = listOf(bucket("a", 10)), + ), + ), + ) + + // Progress should be restored: 5 / 10 instead of 0/5 + turbine.expectProgress(5 to 10) + + addDataLine("a", 5) + turbine.expectProgress(10 to 10) + addCheckpointComplete() + turbine.expectNotDownloading() + + turbine.cancel() + } + + database.close() + syncLines.close() + } + + @Test + fun interruptedSyncWithNewCheckpoint() = + databaseTest { + database.connect(connector) + + turbineScope { + val turbine = database.currentStatus.asFlow().testIn(this) + turbine.waitFor { it.connected && !it.downloading } + syncLines.send( + SyncLine.FullCheckpoint( + Checkpoint( + lastOpId = "10", + checksums = listOf(bucket("a", 10)), + ), + ), + ) + turbine.expectProgress(0 to 10) + + addDataLine("a", 5) + turbine.expectProgress(5 to 10) + + turbine.cancel() + } + + // Close and re-connect + database.close() + syncLines.close() + database = openDatabase() + syncLines = Channel() + database.connect(connector) + + turbineScope { + val turbine = database.currentStatus.asFlow().testIn(this) + turbine.waitFor { it.connected && !it.downloading } + + // Send checkpoint with two more ops + syncLines.send( + SyncLine.FullCheckpoint( + Checkpoint( + lastOpId = "12", + checksums = listOf(bucket("a", 12)), + ), + ), + ) + + turbine.expectProgress(5 to 12) + + addDataLine("a", 7) + turbine.expectProgress(12 to 12) + addCheckpointComplete() + turbine.expectNotDownloading() + + turbine.cancel() + } + + database.close() + syncLines.close() + } + + @Test + fun differentPriorities() = + databaseTest { + database.connect(connector) + + turbineScope { + val turbine = database.currentStatus.asFlow().testIn(this) + turbine.waitFor { it.connected && !it.downloading } + + suspend fun expectProgress( + prio0: Pair, + prio2: Pair, + ) { + turbine.expectProgress(prio2, mapOf(BucketPriority(0) to prio0, BucketPriority(2) to prio2)) + } + + syncLines.send( + SyncLine.FullCheckpoint( + Checkpoint( + lastOpId = "10", + checksums = + listOf( + bucket("a", 5, BucketPriority(0)), + bucket("b", 5, BucketPriority(2)), + ), + ), + ), + ) + expectProgress(0 to 5, 0 to 10) + + addDataLine("a", 5) + expectProgress(5 to 5, 5 to 10) + + addCheckpointComplete(BucketPriority(0)) + expectProgress(5 to 5, 5 to 10) + + addDataLine("b", 2) + expectProgress(5 to 5, 7 to 10) + + // Before syncing b fully, send a new checkpoint + syncLines.send( + SyncLine.CheckpointDiff( + lastOpId = "14", + updatedBuckets = + listOf( + bucket("a", 8, BucketPriority(0)), + bucket("b", 6, BucketPriority(2)), + ), + removedBuckets = emptyList(), + ), + ) + expectProgress(5 to 8, 7 to 14) + + addDataLine("a", 3) + expectProgress(8 to 8, 10 to 14) + addDataLine("b", 4) + expectProgress(8 to 8, 14 to 14) + + addCheckpointComplete() + turbine.expectNotDownloading() + + turbine.cancel() + } + + database.close() + syncLines.close() + } +} diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt index 7560d8cb..f4353eac 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt @@ -82,7 +82,7 @@ internal class ActiveDatabaseTest( ), ) - val syncLines = Channel() + var syncLines = Channel() var checkpointResponse: () -> WriteCheckpointResponse = { WriteCheckpointResponse(WriteCheckpointData("1000")) } diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt index 654bc3c7..ab278ee0 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt @@ -29,6 +29,8 @@ internal interface BucketStorage { suspend fun getBucketStates(): List + suspend fun getBucketOperationProgress(): Map + suspend fun removeBuckets(bucketsToDelete: List) suspend fun hasCompletedSync(): Boolean diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt index 3f647b29..75a23dfa 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt @@ -152,6 +152,22 @@ internal class BucketStorageImpl( }, ) + override suspend fun getBucketOperationProgress(): Map = + buildMap { + val rows = + db.getAll("SELECT name, count_at_last, count_since_last FROM ps_buckets") { cursor -> + cursor.getString(0)!! to + LocalOperationCounters( + atLast = cursor.getLong(1)!!.toInt(), + sinceLast = cursor.getLong(2)!!.toInt(), + ) + } + + for ((name, counters) in rows) { + put(name, counters) + } + } + override suspend fun removeBuckets(bucketsToDelete: List) { bucketsToDelete.forEach { bucketName -> deleteBucket(bucketName) @@ -305,7 +321,6 @@ internal class BucketStorageImpl( } return db.writeTransaction { tx -> - tx.execute( "INSERT INTO powersync_operations(op, data) VALUES(?, ?)", listOf("sync_local", args), @@ -316,7 +331,28 @@ internal class BucketStorageImpl( cursor.getLong(0)!! } - return@writeTransaction res == 1L + val didApply = res == 1L + if (didApply && priority == null) { + // Reset progress counters. We only do this for a complete sync, as we want a download progress to + // always cover a complete checkpoint instead of resetting for partial completions. + tx.execute( + """ + UPDATE ps_buckets SET count_since_last = 0, count_at_last = ?1->name + WHERE name != '${'$'}local' AND ?1->name IS NOT NULL + """.trimIndent(), + listOf( + JsonUtil.json.encodeToString( + buildMap { + for (bucket in checkpoint.checksums) { + bucket.count?.let { put(bucket.bucket, it) } + } + }, + ), + ), + ) + } + + return@writeTransaction didApply } } diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/LocalOperationCounters.kt b/core/src/commonMain/kotlin/com/powersync/bucket/LocalOperationCounters.kt new file mode 100644 index 00000000..73f6a6fc --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/bucket/LocalOperationCounters.kt @@ -0,0 +1,6 @@ +package com.powersync.bucket + +internal data class LocalOperationCounters( + val atLast: Int, + val sinceLast: Int, +) diff --git a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt index 86aa3c76..d58156e2 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt @@ -14,6 +14,7 @@ import com.powersync.db.crud.CrudRow import com.powersync.db.crud.CrudTransaction import com.powersync.db.internal.InternalDatabaseImpl import com.powersync.db.internal.InternalTable +import com.powersync.db.internal.PowerSyncVersion import com.powersync.db.schema.Schema import com.powersync.db.schema.toSerializable import com.powersync.sync.PriorityStatusEntry @@ -217,21 +218,7 @@ internal class PowerSyncDatabaseImpl( } launch { - stream.status.asFlow().collect { - currentStatus.update( - connected = it.connected, - connecting = it.connecting, - uploading = it.uploading, - downloading = it.downloading, - lastSyncedAt = it.lastSyncedAt, - hasSynced = it.hasSynced, - uploadError = it.uploadError, - downloadError = it.downloadError, - clearDownloadError = it.downloadError == null, - clearUploadError = it.uploadError == null, - priorityStatusEntries = it.priorityStatusEntries, - ) - } + currentStatus.trackOther(stream.status) } launch { @@ -437,11 +424,14 @@ internal class PowerSyncDatabaseImpl( syncSupervisorJob = null } - currentStatus.update( - connected = false, - connecting = false, - lastSyncedAt = currentStatus.lastSyncedAt, - ) + currentStatus.update { + copy( + connected = false, + connecting = false, + downloading = false, + downloadProgress = null, + ) + } } override suspend fun disconnectAndClear(clearLocal: Boolean) { @@ -450,7 +440,7 @@ internal class PowerSyncDatabaseImpl( internalDb.writeTransaction { tx -> tx.getOptional("SELECT powersync_clear(?)", listOf(if (clearLocal) "1" else "0")) {} } - currentStatus.update(lastSyncedAt = null, hasSynced = false) + currentStatus.update { copy(lastSyncedAt = null, hasSynced = false) } } private suspend fun updateHasSynced() { @@ -490,11 +480,13 @@ internal class PowerSyncDatabaseImpl( } } - currentStatus.update( - hasSynced = lastSyncedAt != null, - lastSyncedAt = lastSyncedAt, - priorityStatusEntries = priorityStatus, - ) + currentStatus.update { + copy( + hasSynced = lastSyncedAt != null, + lastSyncedAt = lastSyncedAt, + priorityStatusEntries = priorityStatus, + ) + } } override suspend fun waitForFirstSync() = waitForFirstSyncImpl(null) @@ -534,20 +526,9 @@ internal class PowerSyncDatabaseImpl( * Check that a supported version of the powersync extension is loaded. */ private fun checkVersion(powerSyncVersion: String) { - // Parse version - val versionInts: List = - try { - powerSyncVersion - .split(Regex("[./]")) - .take(3) - .map { it.toInt() } - } catch (e: Exception) { - throw Exception("Unsupported powersync extension version. Need ^0.2.0, got: $powerSyncVersion. Details: $e") - } - - // Validate ^0.2.0 - if (versionInts[0] != 0 || versionInts[1] < 2 || versionInts[2] < 0) { - throw Exception("Unsupported powersync extension version. Need ^0.2.0, got: $powerSyncVersion") + val version = PowerSyncVersion.parse(powerSyncVersion) + if (version < PowerSyncVersion.MINIMUM) { + PowerSyncVersion.mismatchError(powerSyncVersion) } } } diff --git a/core/src/commonMain/kotlin/com/powersync/db/internal/PowerSyncVersion.kt b/core/src/commonMain/kotlin/com/powersync/db/internal/PowerSyncVersion.kt new file mode 100644 index 00000000..c9781cb7 --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/db/internal/PowerSyncVersion.kt @@ -0,0 +1,49 @@ +package com.powersync.db.internal + +internal data class PowerSyncVersion( + val major: Int, + val minor: Int, + val patch: Int, +) : Comparable { + override fun compareTo(other: PowerSyncVersion): Int = + when (val compareMajor = major.compareTo(other.major)) { + 0 -> + when (val compareMinor = minor.compareTo(other.minor)) { + 0 -> patch.compareTo(other.patch) + else -> compareMinor + } + else -> compareMajor + } + + override fun toString(): String = "$major.$minor.$patch" + + companion object { + val MINIMUM: PowerSyncVersion = PowerSyncVersion(0, 3, 14) + + fun parse(from: String): PowerSyncVersion { + val versionInts: List = + try { + from + .split(Regex("[./]")) + .take(3) + .map { it.toInt() } + } catch (e: Exception) { + mismatchError(from, e.toString()) + } + + return PowerSyncVersion(versionInts[0], versionInts[1], versionInts[2]) + } + + fun mismatchError( + actualVersion: String, + details: String? = null, + ): Nothing { + var message = "Unsupported PowerSync extension version (need ^$MINIMUM, got $actualVersion)." + if (details != null) { + message = " Details: $details" + } + + throw Exception(message) + } + } +} diff --git a/core/src/commonMain/kotlin/com/powersync/sync/Progress.kt b/core/src/commonMain/kotlin/com/powersync/sync/Progress.kt new file mode 100644 index 00000000..8c6b64e0 --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/sync/Progress.kt @@ -0,0 +1,146 @@ +package com.powersync.sync + +import com.powersync.bucket.BucketPriority +import com.powersync.bucket.Checkpoint +import com.powersync.bucket.LocalOperationCounters + +/** + * Information about a progressing download. + * + * This reports the [totalOperations] amount of operations to download, how many of them have already been + * [downloadedOperations] and finally a [fraction] indicating relative progress. + * + * To obtain a [ProgressWithOperations] instance, use a method on [SyncDownloadProgress] which in turn is available + * on [SyncStatusData]. + */ +public interface ProgressWithOperations { + /** + * How many operations need to be downloaded in total for the current download to complete. + */ + public val totalOperations: Int + + /** + * How many operations, out of [totalOperations], have already been downloaded. + */ + public val downloadedOperations: Int + + /** + * The relative amount of [totalOperations] to items in [downloadedOperations], as a number between `0.0` and `1.0` (inclusive). + * + * When this number reaches `1.0`, all changes have been received from the sync service. + * Actually applying these changes happens before the [SyncStatusData.downloadProgress] field is + * cleared though, so progress can stay at `1.0` for a short while before completing. + */ + public val fraction: Float get() { + if (totalOperations == 0) { + return 0.0f + } + + return downloadedOperations.toFloat() / totalOperations.toFloat() + } +} + +internal data class ProgressInfo( + override val downloadedOperations: Int, + override val totalOperations: Int, +) : ProgressWithOperations + +/** + * Provides realtime progress on how PowerSync is downloading rows. + * + * This type reports progress by implementing [ProgressWithOperations], meaning that the [totalOperations], + * [downloadedOperations] and [fraction] getters are available on this instance. + * Additionally, it's possible to obtain the progress towards a specific priority only (instead of tracking progress for + * the entire download) by using [untilPriority]. + * + * The reported progress always reflects the status towards the end of a sync iteration (after which a consistent + * snapshot of all buckets is available locally). + * + * In rare cases (in particular, when a [compacting](https://docs.powersync.com/usage/lifecycle-maintenance/compacting-buckets) + * operation takes place between syncs), it's possible for the returned numbers to be slightly inaccurate. For this + * reason, [SyncDownloadProgress] should be seen as an approximation of progress. The information returned is good + * enough to build progress bars, but not exact enough to track individual download counts. + * + * Also note that data is downloaded in bulk, which means that individual counters are unlikely to be updated + * one-by-one. + */ +@ConsistentCopyVisibility +public data class SyncDownloadProgress private constructor( + private val buckets: Map, +) : ProgressWithOperations { + override val downloadedOperations: Int + override val totalOperations: Int + + init { + val (target, completed) = targetAndCompletedCounts(BucketPriority.FULL_SYNC_PRIORITY) + totalOperations = target + downloadedOperations = completed + } + + /** + * Creates download progress information from the local progress counters since the last full sync and the target + * checkpoint. + */ + internal constructor(localProgress: Map, target: Checkpoint) : this( + buildMap { + for (entry in target.checksums) { + val savedProgress = localProgress[entry.bucket] + + put( + entry.bucket, + BucketProgress( + priority = entry.priority, + atLast = savedProgress?.atLast ?: 0, + sinceLast = savedProgress?.sinceLast ?: 0, + targetCount = entry.count ?: 0, + ), + ) + } + }, + ) + + /** + * Returns download progress towards all data up until the specified [priority] being received. + * + * The returned [ProgressWithOperations] instance tracks the target amount of operations that need to be downloaded + * in total and how many of them have already been received. + */ + public fun untilPriority(priority: BucketPriority): ProgressWithOperations { + val (total, completed) = targetAndCompletedCounts(priority) + return ProgressInfo(totalOperations = total, downloadedOperations = completed) + } + + internal fun incrementDownloaded(batch: SyncDataBatch): SyncDownloadProgress = + SyncDownloadProgress( + buildMap { + putAll(this@SyncDownloadProgress.buckets) + + for (bucket in batch.buckets) { + val previous = get(bucket.bucket) ?: continue + put( + bucket.bucket, + previous.copy( + sinceLast = previous.sinceLast + bucket.data.size, + ), + ) + } + }, + ) + + private fun targetAndCompletedCounts(priority: BucketPriority): Pair = + buckets.values + .asSequence() + .filter { it.priority >= priority } + .fold(0 to 0) { (prevTarget, prevCompleted), entry -> + (prevTarget + entry.total) to (prevCompleted + entry.sinceLast) + } +} + +private data class BucketProgress( + val priority: BucketPriority, + val atLast: Int, + val sinceLast: Int, + val targetCount: Int, +) { + val total get(): Int = targetCount - atLast +} diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt index ce4fde57..067080b9 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt @@ -5,6 +5,7 @@ import com.powersync.connectors.PowerSyncBackendConnector import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.asSharedFlow +import kotlinx.datetime.Clock import kotlinx.datetime.Instant @ConsistentCopyVisibility @@ -36,6 +37,15 @@ public interface SyncStatusData { */ public val downloading: Boolean + /** + * Realtime progress information about downloaded operations during an active sync. + * + * + * For more information on what progress is reported, see [SyncDownloadProgress]. + * This value will be non-null only if [downloading] is true. + */ + public val downloadProgress: SyncDownloadProgress? + /** * true if uploading changes */ @@ -106,6 +116,7 @@ internal data class SyncStatusDataContainer( override val connected: Boolean = false, override val connecting: Boolean = false, override val downloading: Boolean = false, + override val downloadProgress: SyncDownloadProgress? = null, override val uploading: Boolean = false, override val lastSyncedAt: Instant? = null, override val hasSynced: Boolean? = null, @@ -115,6 +126,21 @@ internal data class SyncStatusDataContainer( ) : SyncStatusData { override val anyError get() = downloadError ?: uploadError + + internal fun abortedDownload() = + copy( + downloading = false, + downloadProgress = null, + ) + + internal fun copyWithCompletedDownload() = + copy( + lastSyncedAt = Clock.System.now(), + downloading = false, + downloadProgress = null, + hasSynced = true, + downloadError = null, + ) } @ConsistentCopyVisibility @@ -131,34 +157,17 @@ public data class SyncStatus internal constructor( /** * Updates the internal sync status indicators and emits Flow updates */ - internal fun update( - connected: Boolean? = null, - connecting: Boolean? = null, - downloading: Boolean? = null, - uploading: Boolean? = null, - hasSynced: Boolean? = null, - lastSyncedAt: Instant? = null, - uploadError: Any? = null, - downloadError: Any? = null, - clearUploadError: Boolean = false, - clearDownloadError: Boolean = false, - priorityStatusEntries: List? = null, - ) { - data = - data.copy( - connected = connected ?: data.connected, - connecting = connecting ?: data.connecting, - downloading = downloading ?: data.downloading, - uploading = uploading ?: data.uploading, - lastSyncedAt = lastSyncedAt ?: data.lastSyncedAt, - hasSynced = hasSynced ?: data.hasSynced, - priorityStatusEntries = priorityStatusEntries ?: data.priorityStatusEntries, - uploadError = if (clearUploadError) null else uploadError, - downloadError = if (clearDownloadError) null else downloadError, - ) + internal inline fun update(makeCopy: SyncStatusDataContainer.() -> SyncStatusDataContainer) { + data = data.makeCopy() stateFlow.value = data } + internal suspend fun trackOther(source: SyncStatus) { + source.stateFlow.collect { + update { it } + } + } + override val anyError: Any? get() = data.anyError @@ -171,6 +180,9 @@ public data class SyncStatus internal constructor( override val downloading: Boolean get() = data.downloading + override val downloadProgress: SyncDownloadProgress? + get() = data.downloadProgress + override val uploading: Boolean get() = data.uploading diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt index 56859443..81556310 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt @@ -72,7 +72,7 @@ internal class SyncStream( var invalidCredentials = false clientId = bucketStorage.getClientId() while (true) { - status.update(connecting = true) + status.update { copy(connecting = true) } try { if (invalidCredentials) { // This may error. In that case it will be retried again on the next @@ -87,15 +87,9 @@ internal class SyncStream( } logger.e("Error in streamingSync: ${e.message}") - status.update( - downloadError = e, - ) + status.update { copy(downloadError = e) } } finally { - status.update( - connected = false, - connecting = true, - downloading = false, - ) + status.update { copy(connected = false, connecting = true, downloading = false) } delay(retryDelayMs) } } @@ -142,7 +136,7 @@ internal class SyncStream( } checkedCrudItem = nextCrudItem - status.update(uploading = true) + status.update { copy(uploading = true) } uploadCrud() } else { // Uploading is completed @@ -150,7 +144,7 @@ internal class SyncStream( break } } catch (e: Exception) { - status.update(uploading = false, uploadError = e) + status.update { copy(uploading = false, uploadError = e) } if (e is CancellationException) { throw e @@ -161,7 +155,7 @@ internal class SyncStream( break } } - status.update(uploading = false) + status.update { copy(uploading = false) } } private suspend fun getWriteCheckpoint(): String { @@ -215,7 +209,7 @@ internal class SyncStream( throw RuntimeException("Received error when connecting to sync stream: ${httpResponse.bodyAsText()}") } - status.update(connected = true, connecting = false) + status.update { copy(connected = true, connecting = false) } val channel: ByteReadChannel = httpResponse.body() while (!channel.isClosedForRead) { @@ -260,7 +254,7 @@ internal class SyncStream( } } - status.update(downloading = false) + status.update { abortedDownload() } return state } @@ -294,7 +288,6 @@ internal class SyncStream( ): SyncStreamState { val (checkpoint) = line state.targetCheckpoint = checkpoint - status.update(downloading = true) val bucketsToDelete = state.bucketSet!!.toMutableList() val newBuckets = mutableSetOf() @@ -306,15 +299,30 @@ internal class SyncStream( } } - if (bucketsToDelete.size > 0) { + state.bucketSet = newBuckets + startTrackingCheckpoint(checkpoint, bucketsToDelete) + + return state + } + + private suspend fun startTrackingCheckpoint( + checkpoint: Checkpoint, + bucketsToDelete: List, + ) { + val progress = bucketStorage.getBucketOperationProgress() + status.update { + copy( + downloading = true, + downloadProgress = SyncDownloadProgress(progress, checkpoint), + ) + } + + if (bucketsToDelete.isNotEmpty()) { logger.i { "Removing buckets [${bucketsToDelete.joinToString(separator = ", ")}]" } } - state.bucketSet = newBuckets bucketStorage.removeBuckets(bucketsToDelete) bucketStorage.setTargetCheckpoint(checkpoint) - - return state } private suspend fun handleStreamingSyncCheckpointComplete(state: SyncStreamState): SyncStreamState { @@ -343,12 +351,7 @@ internal class SyncStream( logger.i { "validated checkpoint ${state.appliedCheckpoint}" } state.validatedCheckpoint = state.targetCheckpoint - status.update( - lastSyncedAt = Clock.System.now(), - downloading = false, - hasSynced = true, - clearDownloadError = true, - ) + status.update { copyWithCompletedDownload() } } else { logger.d { "Could not apply checkpoint. Waiting for next sync complete line" } } @@ -376,20 +379,22 @@ internal class SyncStream( logger.i { "validated partial checkpoint ${state.appliedCheckpoint} up to priority of $priority" } } - status.update( - priorityStatusEntries = - buildList { - // All states with a higher priority can be deleted since this partial sync includes them. - addAll(status.priorityStatusEntries.filter { it.priority >= line.priority }) - add( - PriorityStatusEntry( - priority = priority, - lastSyncedAt = Clock.System.now(), - hasSynced = true, - ), - ) - }, - ) + status.update { + copy( + priorityStatusEntries = + buildList { + // All states with a higher priority can be deleted since this partial sync includes them. + addAll(status.priorityStatusEntries.filter { it.priority >= line.priority }) + add( + PriorityStatusEntry( + priority = priority, + lastSyncedAt = Clock.System.now(), + hasSynced = true, + ), + ) + }, + ) + } return state } @@ -402,8 +407,6 @@ internal class SyncStream( throw Exception("Checkpoint diff without previous checkpoint") } - status.update(downloading = true) - val newBuckets = mutableMapOf() state.targetCheckpoint!!.checksums.forEach { checksum -> @@ -423,15 +426,7 @@ internal class SyncStream( ) state.targetCheckpoint = newCheckpoint - - state.bucketSet = newBuckets.keys.toMutableSet() - - val bucketsToDelete = checkpointDiff.removedBuckets - if (bucketsToDelete.isNotEmpty()) { - logger.d { "Remove buckets $bucketsToDelete" } - } - bucketStorage.removeBuckets(bucketsToDelete) - bucketStorage.setTargetCheckpoint(state.targetCheckpoint!!) + startTrackingCheckpoint(newCheckpoint, checkpointDiff.removedBuckets) return state } @@ -440,8 +435,9 @@ internal class SyncStream( data: SyncLine.SyncDataBucket, state: SyncStreamState, ): SyncStreamState { - status.update(downloading = true) - bucketStorage.saveSyncData(SyncDataBatch(listOf(data))) + val batch = SyncDataBatch(listOf(data)) + status.update { copy(downloading = true, downloadProgress = downloadProgress?.incrementDownloaded(batch)) } + bucketStorage.saveSyncData(batch) return state } diff --git a/core/src/commonTest/kotlin/com/powersync/sync/ProgressTest.kt b/core/src/commonTest/kotlin/com/powersync/sync/ProgressTest.kt new file mode 100644 index 00000000..c43e7c51 --- /dev/null +++ b/core/src/commonTest/kotlin/com/powersync/sync/ProgressTest.kt @@ -0,0 +1,15 @@ +package com.powersync.sync + +import kotlin.test.Test +import kotlin.test.assertEquals + +class ProgressTest { + @Test + fun reportsFraction() { + assertEquals(0.0f, ProgressInfo(0, 10).fraction) + assertEquals(0.5f, ProgressInfo(5, 10).fraction) + assertEquals(1.0f, ProgressInfo(10, 10).fraction) + + assertEquals(0.0f, ProgressInfo(0, 0).fraction) + } +} diff --git a/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt b/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt index b9011a46..558ada3c 100644 --- a/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt +++ b/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt @@ -33,6 +33,7 @@ import io.ktor.client.HttpClient import io.ktor.client.engine.mock.MockEngine import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay +import kotlinx.coroutines.job import kotlinx.coroutines.launch import kotlinx.coroutines.test.runTest import kotlinx.coroutines.withTimeout @@ -80,6 +81,7 @@ class SyncStreamTest { checkpointValid = true, checkpointFailures = emptyList(), ) + everySuspend { getBucketOperationProgress() } returns mapOf() } connector = mock { @@ -147,7 +149,7 @@ class SyncStreamTest { scope = this, ) - syncStream.status.update(connected = true) + syncStream.status.update { copy(connected = true) } syncStream.triggerCrudUploadAsync().join() testLogWriter.assertCount(2) @@ -295,6 +297,7 @@ class SyncStreamTest { verifySuspend(order) { if (priorityNo == 0) { + bucketStorage.getBucketOperationProgress() bucketStorage.removeBuckets(any()) bucketStorage.setTargetCheckpoint(any()) } diff --git a/demos/supabase-todolist/settings.gradle.kts b/demos/supabase-todolist/settings.gradle.kts index 73016629..3e4cd6c1 100644 --- a/demos/supabase-todolist/settings.gradle.kts +++ b/demos/supabase-todolist/settings.gradle.kts @@ -44,15 +44,18 @@ val useReleasedVersions = localProperties.getProperty("USE_RELEASED_POWERSYNC_VE if (!useReleasedVersions) { includeBuild("../..") { dependencySubstitution { - substitute(module("com.powersync:core")) - .using(project(":core")) - .because("we want to auto-wire up sample dependency") - substitute(module("com.powersync:persistence")) - .using(project(":persistence")) - .because("we want to auto-wire up sample dependency") - substitute(module("com.powersync:connector-supabase")) - .using(project(":connectors:supabase")) - .because("we want to auto-wire up sample dependency") + val replacements = mapOf( + "core" to "core", + "persistence" to "persistence", + "connector-supabase" to "connectors:supabase", + "compose" to "compose" + ) + + replacements.forEach { (moduleName, projectName) -> + substitute(module("com.powersync:$moduleName")) + .using(project(":$projectName")) + .because("we want to auto-wire up sample dependency") + } } } } diff --git a/demos/supabase-todolist/shared/build.gradle.kts b/demos/supabase-todolist/shared/build.gradle.kts index f2a59ab1..708ed52c 100644 --- a/demos/supabase-todolist/shared/build.gradle.kts +++ b/demos/supabase-todolist/shared/build.gradle.kts @@ -44,6 +44,7 @@ kotlin { // at: https://central.sonatype.com/artifact/com.powersync/core api("com.powersync:core:latest.release") implementation("com.powersync:connector-supabase:latest.release") + implementation("com.powersync:compose:latest.release") implementation(libs.uuid) implementation(compose.runtime) implementation(compose.foundation) diff --git a/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/App.kt b/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/App.kt index c1a1cb95..cb7141c1 100644 --- a/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/App.kt +++ b/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/App.kt @@ -5,7 +5,6 @@ import androidx.compose.material.MaterialTheme import androidx.compose.runtime.Composable import androidx.compose.runtime.LaunchedEffect import androidx.compose.runtime.collectAsState -import androidx.compose.runtime.derivedStateOf import androidx.compose.runtime.getValue import androidx.compose.runtime.mutableStateOf import androidx.compose.runtime.remember @@ -13,7 +12,7 @@ import androidx.compose.runtime.rememberUpdatedState import androidx.compose.ui.Modifier import com.powersync.DatabaseDriverFactory import com.powersync.PowerSyncDatabase -import com.powersync.bucket.BucketPriority +import com.powersync.compose.composeState import com.powersync.connector.supabase.SupabaseConnector import com.powersync.connectors.PowerSyncBackendConnector import com.powersync.demos.components.EditDialog @@ -25,7 +24,6 @@ import com.powersync.demos.screens.HomeScreen import com.powersync.demos.screens.SignInScreen import com.powersync.demos.screens.SignUpScreen import com.powersync.demos.screens.TodosScreen -import kotlinx.coroutines.flow.debounce import kotlinx.coroutines.runBlocking import org.koin.compose.KoinApplication import org.koin.compose.koinInject @@ -71,19 +69,8 @@ fun AppContent( db: PowerSyncDatabase = koinInject(), modifier: Modifier = Modifier, ) { - // Debouncing the status flow prevents flicker - val status by db.currentStatus - .asFlow() - .debounce(200) - .collectAsState(initial = db.currentStatus) - - // This assumes that the buckets for lists has a priority of 1 (but it will work fine with sync - // rules not defining any priorities at all too). When giving lists a higher priority than - // items, we can have a consistent snapshot of lists without items. In the case where many items - // exist (that might take longer to sync initially), this allows us to display lists earlier. - val hasSyncedLists by remember { - derivedStateOf { status.statusForPriority(BucketPriority(1)).hasSynced } - } + + val status by db.currentStatus.composeState() val authViewModel = koinViewModel() val navController = koinInject() diff --git a/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/components/GuardBySync.kt b/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/components/GuardBySync.kt new file mode 100644 index 00000000..52e5b521 --- /dev/null +++ b/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/components/GuardBySync.kt @@ -0,0 +1,81 @@ +package com.powersync.demos.components + +import androidx.compose.foundation.background +import androidx.compose.foundation.layout.Arrangement +import androidx.compose.foundation.layout.Column +import androidx.compose.foundation.layout.fillMaxSize +import androidx.compose.foundation.layout.fillMaxWidth +import androidx.compose.foundation.layout.padding +import androidx.compose.material.LinearProgressIndicator +import androidx.compose.material.MaterialTheme +import androidx.compose.material.Text +import androidx.compose.runtime.Composable +import androidx.compose.runtime.getValue +import androidx.compose.ui.Alignment +import androidx.compose.ui.Modifier +import androidx.compose.ui.unit.dp +import com.powersync.PowerSyncDatabase +import com.powersync.bucket.BucketPriority +import com.powersync.compose.composeState +import com.powersync.sync.SyncStatusData +import org.koin.compose.koinInject + +/** + * A component that renders its [content] only after a first complete sync was completed on [db]. + * + * Before that, a progress indicator is shown instead. + */ +@Composable +fun GuardBySync( + db: PowerSyncDatabase = koinInject(), + priority: BucketPriority? = null, + content: @Composable () -> Unit +) { + val state: SyncStatusData by db.currentStatus.composeState() + + if (state.hasSynced == true) { + content() + return + } + + Column( + modifier = Modifier.fillMaxSize().background(MaterialTheme.colors.background), + horizontalAlignment = Alignment.CenterHorizontally, + verticalArrangement = Arrangement.Center, + ) { + // When we have no hasSynced information, the database is still being opened. We just show a + // generic progress bar in that case. + val databaseOpening = state.hasSynced == null + + if (!databaseOpening) { + Text( + text = "Busy with initial sync...", + style = MaterialTheme.typography.h6, + ) + } + + val progress = state.downloadProgress?.let { + if (priority == null) { + it + } else { + it.untilPriority(priority) + } + } + if (progress != null) { + LinearProgressIndicator( + modifier = Modifier.fillMaxWidth().padding(8.dp), + progress = progress.fraction, + ) + + if (progress.downloadedOperations == progress.totalOperations) { + Text("Applying server-side changes...") + } else { + Text("Downloaded ${progress.downloadedOperations} out of ${progress.totalOperations}.") + } + } else { + LinearProgressIndicator( + modifier = Modifier.fillMaxWidth().padding(8.dp), + ) + } + } +} diff --git a/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/screens/HomeScreen.kt b/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/screens/HomeScreen.kt index c543be58..4e53852e 100644 --- a/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/screens/HomeScreen.kt +++ b/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/screens/HomeScreen.kt @@ -16,7 +16,9 @@ import androidx.compose.ui.Alignment import androidx.compose.ui.Modifier import androidx.compose.ui.text.style.TextAlign import androidx.compose.ui.unit.dp +import com.powersync.bucket.BucketPriority import com.powersync.demos.Screen +import com.powersync.demos.components.GuardBySync import com.powersync.demos.components.Input import com.powersync.demos.components.ListContent import com.powersync.demos.components.Menu @@ -57,34 +59,25 @@ internal fun HomeScreen( }, ) - when { - syncStatus.hasSynced == null || syncStatus.hasSynced == false -> { - Box( - modifier = Modifier.fillMaxSize().background(MaterialTheme.colors.background), - contentAlignment = Alignment.Center, - ) { - Text( - text = "Busy with initial sync...", - style = MaterialTheme.typography.h6, - ) - } - } + // This assumes that the buckets for lists has a priority of 1 (but it will work fine with + // sync rules not defining any priorities at all too). When giving lists a higher priority + // than items, we can have a consistent snapshot of lists without items. In the case where + // many items exist (that might take longer to sync initially), this allows us to display + // lists earlier. + GuardBySync(priority = BucketPriority(1)) { + Input( + text = inputText, + onAddClicked = onAddItemClicked, + onTextChanged = onInputTextChanged, + screen = Screen.Home, + ) - else -> { - Input( - text = inputText, - onAddClicked = onAddItemClicked, - onTextChanged = onInputTextChanged, - screen = Screen.Home, + Box(Modifier.weight(1F)) { + ListContent( + items = items, + onItemClicked = onItemClicked, + onItemDeleteClicked = onItemDeleteClicked, ) - - Box(Modifier.weight(1F)) { - ListContent( - items = items, - onItemClicked = onItemClicked, - onItemDeleteClicked = onItemDeleteClicked, - ) - } } } } diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index bd5f8496..a617db08 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -17,7 +17,7 @@ kotlinx-datetime = "0.6.2" kotlinx-io = "0.5.4" ktor = "3.0.1" uuid = "0.8.2" -powersync-core = "0.3.12" +powersync-core = "0.3.14" sqlite-jdbc = "3.49.1.0" sqliter = "1.3.1" turbine = "1.2.0"