diff --git a/CHANGELOG.md b/CHANGELOG.md index a22a703e..03a76802 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Changelog +## unreleased + +* Fixed `CrudBatch` `hasMore` always returning false. +* Added `triggerImmediately` to `onChange` method. + ## 1.0.0-BETA32 * Added `onChange` method to the PowerSync client. This allows for observing table changes. diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt index 48283139..3e5f5f19 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt @@ -187,14 +187,22 @@ class DatabaseTest { fun testTableChangesUpdates() = databaseTest { turbineScope { - val query = database.onChange(tables = setOf("users")).testIn(this) + val query = + database + .onChange( + tables = setOf("users"), + ).testIn(this) database.execute( "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", listOf("Test", "test@example.org"), ) - val changeSet = query.awaitItem() + var changeSet = query.awaitItem() + // The initial result + changeSet.count() shouldBe 0 + + changeSet = query.awaitItem() changeSet.count() shouldBe 1 changeSet.contains("users") shouldBe true @@ -418,4 +426,37 @@ class DatabaseTest { database.getNextCrudTransaction() shouldBe null } + + @Test + fun testCrudBatch() = + databaseTest { + database.execute( + "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", + listOf("a", "a@example.org"), + ) + + database.writeTransaction { + it.execute( + "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", + listOf("b", "b@example.org"), + ) + it.execute( + "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", + listOf("c", "c@example.org"), + ) + } + + // Purposely limit to less than the number of available ops + var batch = database.getCrudBatch(2) ?: error("Batch should not be null") + batch.hasMore shouldBe true + batch.crud shouldHaveSize 2 + batch.complete(null) + + batch = database.getCrudBatch(1000) ?: error("Batch should not be null") + batch.crud shouldHaveSize 1 + batch.hasMore shouldBe false + batch.complete(null) + + database.getCrudBatch() shouldBe null + } } diff --git a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt index 5a67b551..86aa3c76 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt @@ -258,10 +258,10 @@ internal class PowerSyncDatabaseImpl( return null } - val entries = + var entries = internalDb.getAll( "SELECT id, tx_id, data FROM ps_crud ORDER BY id ASC LIMIT ?", - listOf(limit.toLong()), + listOf(limit.toLong() + 1), ) { CrudEntry.fromRow( CrudRow( @@ -278,7 +278,7 @@ internal class PowerSyncDatabaseImpl( val hasMore = entries.size > limit if (hasMore) { - entries.dropLast(entries.size - limit) + entries = entries.dropLast(1) } return CrudBatch(entries, hasMore, complete = { writeCheckpoint -> @@ -351,11 +351,12 @@ internal class PowerSyncDatabaseImpl( override fun onChange( tables: Set, throttleMs: Long, + triggerImmediately: Boolean, ): Flow> = flow { waitReady() emitAll( - internalDb.onChange(tables, throttleMs), + internalDb.onChange(tables, throttleMs, triggerImmediately), ) } diff --git a/core/src/commonMain/kotlin/com/powersync/db/Queries.kt b/core/src/commonMain/kotlin/com/powersync/db/Queries.kt index c8f3d870..0f41cb54 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/Queries.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/Queries.kt @@ -97,6 +97,7 @@ public interface Queries { * * @param tables The set of tables to monitor for changes. * @param throttleMs The minimum interval, in milliseconds, between emissions. Defaults to [DEFAULT_THROTTLE]. Table changes are accumulated while throttling is active. The accumulated set of tables will be emitted on the trailing edge of the throttle. + * @param triggerImmediately If true (default), the flow will immediately emit an empty set of tables when the flow is first collected. This can be useful for ensuring that the flow emits at least once, even if no changes occur to the monitored tables. * @return A [Flow] emitting the set of modified tables. * @throws PowerSyncException If a database error occurs. * @throws CancellationException If the operation is cancelled. @@ -105,6 +106,7 @@ public interface Queries { public fun onChange( tables: Set, throttleMs: Long = DEFAULT_THROTTLE.inWholeMilliseconds, + triggerImmediately: Boolean = true, ): Flow> /** diff --git a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt index 7477f4f8..539b698a 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt @@ -106,6 +106,7 @@ internal class InternalDatabaseImpl( override fun onChange( tables: Set, throttleMs: Long, + triggerImmediately: Boolean, ): Flow> = channelFlow { // Match all possible internal table combinations @@ -116,7 +117,12 @@ internal class InternalDatabaseImpl( val batchedUpdates = AtomicMutableSet() updatesOnTables() - .transform { updates -> + .onSubscription { + if (triggerImmediately) { + // Emit an initial event (if requested). No changes would be detected at this point + send(setOf()) + } + }.transform { updates -> val intersection = updates.intersect(watchedTables) if (intersection.isNotEmpty()) { // Transform table names using friendlyTableName