From ae49c22188af0b0086b69b14787809b96f82899e Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Tue, 29 Apr 2025 15:35:39 +0200 Subject: [PATCH 1/3] fix hasMore --- CHANGELOG.md | 5 +++ .../kotlin/com/powersync/DatabaseTest.kt | 32 +++++++++++++++++++ .../com/powersync/db/PowerSyncDatabaseImpl.kt | 9 +++--- .../kotlin/com/powersync/db/Queries.kt | 2 ++ .../db/internal/InternalDatabaseImpl.kt | 8 ++++- 5 files changed, 51 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a22a703e..cdffbfb8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Changelog +## 1.0.0-BETA33 + +* Fixed `CrudBatch` `hasMore` always returning false. +* Added `triggerImmediately` to `onChange` method. + ## 1.0.0-BETA32 * Added `onChange` method to the PowerSync client. This allows for observing table changes. diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt index 48283139..a5d24e57 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt @@ -418,4 +418,36 @@ class DatabaseTest { database.getNextCrudTransaction() shouldBe null } + + @Test + fun testCrudBatch() = + databaseTest { + database.execute( + "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", + listOf("a", "a@example.org"), + ) + + database.writeTransaction { + it.execute( + "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", + listOf("b", "b@example.org"), + ) + it.execute( + "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", + listOf("c", "c@example.org"), + ) + } + + // Purposely limit to less than the number of available ops + var batch = database.getCrudBatch(2) ?: error("Batch should not be null") + batch.hasMore shouldBe true + batch.crud shouldHaveSize 2 + batch.complete(null) + + batch = database.getCrudBatch(1000) ?: error("Batch should not be null") + batch.crud shouldHaveSize 1 + batch.complete(null) + + database.getCrudBatch() shouldBe null + } } diff --git a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt index 5a67b551..86aa3c76 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt @@ -258,10 +258,10 @@ internal class PowerSyncDatabaseImpl( return null } - val entries = + var entries = internalDb.getAll( "SELECT id, tx_id, data FROM ps_crud ORDER BY id ASC LIMIT ?", - listOf(limit.toLong()), + listOf(limit.toLong() + 1), ) { CrudEntry.fromRow( CrudRow( @@ -278,7 +278,7 @@ internal class PowerSyncDatabaseImpl( val hasMore = entries.size > limit if (hasMore) { - entries.dropLast(entries.size - limit) + entries = entries.dropLast(1) } return CrudBatch(entries, hasMore, complete = { writeCheckpoint -> @@ -351,11 +351,12 @@ internal class PowerSyncDatabaseImpl( override fun onChange( tables: Set, throttleMs: Long, + triggerImmediately: Boolean, ): Flow> = flow { waitReady() emitAll( - internalDb.onChange(tables, throttleMs), + internalDb.onChange(tables, throttleMs, triggerImmediately), ) } diff --git a/core/src/commonMain/kotlin/com/powersync/db/Queries.kt b/core/src/commonMain/kotlin/com/powersync/db/Queries.kt index c8f3d870..0f41cb54 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/Queries.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/Queries.kt @@ -97,6 +97,7 @@ public interface Queries { * * @param tables The set of tables to monitor for changes. * @param throttleMs The minimum interval, in milliseconds, between emissions. Defaults to [DEFAULT_THROTTLE]. Table changes are accumulated while throttling is active. The accumulated set of tables will be emitted on the trailing edge of the throttle. + * @param triggerImmediately If true (default), the flow will immediately emit an empty set of tables when the flow is first collected. This can be useful for ensuring that the flow emits at least once, even if no changes occur to the monitored tables. * @return A [Flow] emitting the set of modified tables. * @throws PowerSyncException If a database error occurs. * @throws CancellationException If the operation is cancelled. @@ -105,6 +106,7 @@ public interface Queries { public fun onChange( tables: Set, throttleMs: Long = DEFAULT_THROTTLE.inWholeMilliseconds, + triggerImmediately: Boolean = true, ): Flow> /** diff --git a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt index 7477f4f8..539b698a 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt @@ -106,6 +106,7 @@ internal class InternalDatabaseImpl( override fun onChange( tables: Set, throttleMs: Long, + triggerImmediately: Boolean, ): Flow> = channelFlow { // Match all possible internal table combinations @@ -116,7 +117,12 @@ internal class InternalDatabaseImpl( val batchedUpdates = AtomicMutableSet() updatesOnTables() - .transform { updates -> + .onSubscription { + if (triggerImmediately) { + // Emit an initial event (if requested). No changes would be detected at this point + send(setOf()) + } + }.transform { updates -> val intersection = updates.intersect(watchedTables) if (intersection.isNotEmpty()) { // Transform table names using friendlyTableName From 4e7cbb14d4c328ca73e1456fb9b486afaa8bb4b5 Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Tue, 29 Apr 2025 16:19:14 +0200 Subject: [PATCH 2/3] update test --- CHANGELOG.md | 2 +- .../kotlin/com/powersync/DatabaseTest.kt | 12 ++++++++++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cdffbfb8..03a76802 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ # Changelog -## 1.0.0-BETA33 +## unreleased * Fixed `CrudBatch` `hasMore` always returning false. * Added `triggerImmediately` to `onChange` method. diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt index a5d24e57..bddd852d 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt @@ -187,14 +187,22 @@ class DatabaseTest { fun testTableChangesUpdates() = databaseTest { turbineScope { - val query = database.onChange(tables = setOf("users")).testIn(this) + val query = + database + .onChange( + tables = setOf("users"), + ).testIn(this) database.execute( "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", listOf("Test", "test@example.org"), ) - val changeSet = query.awaitItem() + var changeSet = query.awaitItem() + // The initial result + changeSet.count() shouldBe 0 + + changeSet = query.awaitItem() changeSet.count() shouldBe 1 changeSet.contains("users") shouldBe true From f2ce13e5d5e91b1e7fb4d4494d514e964aa1734e Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Tue, 29 Apr 2025 17:18:43 +0200 Subject: [PATCH 3/3] assert hasMore is false in test --- .../commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt index bddd852d..3e5f5f19 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt @@ -454,6 +454,7 @@ class DatabaseTest { batch = database.getCrudBatch(1000) ?: error("Batch should not be null") batch.crud shouldHaveSize 1 + batch.hasMore shouldBe false batch.complete(null) database.getCrudBatch() shouldBe null