Skip to content

Commit c015326

Browse files
authored
Merge pull request #155 from powersync-ja/sync-progress
Report download progress
2 parents e3996e7 + af8a6af commit c015326

File tree

21 files changed

+839
-173
lines changed

21 files changed

+839
-173
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
* Fixed `CrudBatch` `hasMore` always returning false.
66
* Added `triggerImmediately` to `onChange` method.
7+
* Report real-time progress information about downloads through `SyncStatus.downloadProgress`.
8+
* Compose: Add `composeState()` extension method on `SyncStatus`.
79

810
## 1.0.0-BETA32
911

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package com.powersync.compose
2+
3+
import androidx.compose.runtime.Composable
4+
import androidx.compose.runtime.State
5+
import androidx.compose.runtime.collectAsState
6+
import com.powersync.sync.SyncStatus
7+
import com.powersync.sync.SyncStatusData
8+
9+
@Composable
10+
public fun SyncStatus.composeState(): State<SyncStatusData> = asFlow().collectAsState(initial = this)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,343 @@
1+
package com.powersync.sync
2+
3+
import app.cash.turbine.ReceiveTurbine
4+
import app.cash.turbine.turbineScope
5+
import com.powersync.bucket.BucketChecksum
6+
import com.powersync.bucket.BucketPriority
7+
import com.powersync.bucket.Checkpoint
8+
import com.powersync.bucket.OpType
9+
import com.powersync.bucket.OplogEntry
10+
import com.powersync.testutils.ActiveDatabaseTest
11+
import com.powersync.testutils.databaseTest
12+
import com.powersync.testutils.waitFor
13+
import kotlinx.coroutines.channels.Channel
14+
import kotlin.test.BeforeTest
15+
import kotlin.test.Test
16+
import kotlin.test.assertEquals
17+
import kotlin.test.assertFalse
18+
import kotlin.test.assertNull
19+
import kotlin.test.assertTrue
20+
21+
class SyncProgressTest {
22+
private var lastOpId = 0
23+
24+
@BeforeTest
25+
fun resetOpId() {
26+
lastOpId = 0
27+
}
28+
29+
private fun bucket(
30+
name: String,
31+
count: Int,
32+
priority: BucketPriority = BucketPriority(3),
33+
): BucketChecksum =
34+
BucketChecksum(
35+
bucket = name,
36+
priority = priority,
37+
checksum = 0,
38+
count = count,
39+
)
40+
41+
private suspend fun ActiveDatabaseTest.addDataLine(
42+
bucket: String,
43+
amount: Int,
44+
) {
45+
syncLines.send(
46+
SyncLine.SyncDataBucket(
47+
bucket = bucket,
48+
data =
49+
List(amount) {
50+
OplogEntry(
51+
checksum = 0,
52+
opId = (++lastOpId).toString(),
53+
op = OpType.PUT,
54+
rowId = lastOpId.toString(),
55+
rowType = bucket,
56+
data = "{}",
57+
)
58+
},
59+
after = null,
60+
nextAfter = null,
61+
),
62+
)
63+
}
64+
65+
private suspend fun ActiveDatabaseTest.addCheckpointComplete(priority: BucketPriority? = null) {
66+
if (priority != null) {
67+
syncLines.send(
68+
SyncLine.CheckpointPartiallyComplete(
69+
lastOpId = lastOpId.toString(),
70+
priority = priority,
71+
),
72+
)
73+
} else {
74+
syncLines.send(SyncLine.CheckpointComplete(lastOpId = lastOpId.toString()))
75+
}
76+
}
77+
78+
private suspend fun ReceiveTurbine<SyncStatusData>.expectProgress(
79+
total: Pair<Int, Int>,
80+
priorities: Map<BucketPriority, Pair<Int, Int>> = emptyMap(),
81+
) {
82+
val item = awaitItem()
83+
val progress = item.downloadProgress ?: error("Expected download progress on $item")
84+
85+
assertTrue { item.downloading }
86+
assertEquals(total.first, progress.downloadedOperations)
87+
assertEquals(total.second, progress.totalOperations)
88+
89+
priorities.forEach { (priority, expected) ->
90+
val (expectedDownloaded, expectedTotal) = expected
91+
val progress = progress.untilPriority(priority)
92+
assertEquals(expectedDownloaded, progress.downloadedOperations)
93+
assertEquals(expectedTotal, progress.totalOperations)
94+
}
95+
}
96+
97+
private suspend fun ReceiveTurbine<SyncStatusData>.expectNotDownloading() {
98+
awaitItem().also {
99+
assertFalse { it.downloading }
100+
assertNull(it.downloadProgress)
101+
}
102+
}
103+
104+
@Test
105+
fun withoutPriorities() =
106+
databaseTest {
107+
database.connect(connector)
108+
109+
turbineScope {
110+
val turbine = database.currentStatus.asFlow().testIn(this)
111+
turbine.waitFor { it.connected && !it.downloading }
112+
113+
// Send checkpoint with 10 ops, progress should be 0/10
114+
syncLines.send(
115+
SyncLine.FullCheckpoint(
116+
Checkpoint(
117+
lastOpId = "10",
118+
checksums = listOf(bucket("a", 10)),
119+
),
120+
),
121+
)
122+
turbine.expectProgress(0 to 10)
123+
124+
addDataLine("a", 10)
125+
turbine.expectProgress(10 to 10)
126+
127+
addCheckpointComplete()
128+
turbine.expectNotDownloading()
129+
130+
// Emit new data, progress should be 0/2 instead of 10/12
131+
syncLines.send(
132+
SyncLine.CheckpointDiff(
133+
lastOpId = "12",
134+
updatedBuckets = listOf(bucket("a", 12)),
135+
removedBuckets = emptyList(),
136+
),
137+
)
138+
turbine.expectProgress(0 to 2)
139+
140+
addDataLine("a", 2)
141+
turbine.expectProgress(2 to 2)
142+
143+
addCheckpointComplete()
144+
turbine.expectNotDownloading()
145+
146+
turbine.cancel()
147+
}
148+
149+
database.close()
150+
syncLines.close()
151+
}
152+
153+
@Test
154+
fun interruptedSync() =
155+
databaseTest {
156+
database.connect(connector)
157+
158+
turbineScope {
159+
val turbine = database.currentStatus.asFlow().testIn(this)
160+
turbine.waitFor { it.connected && !it.downloading }
161+
162+
// Send checkpoint with 10 ops, progress should be 0/10
163+
syncLines.send(
164+
SyncLine.FullCheckpoint(
165+
Checkpoint(
166+
lastOpId = "10",
167+
checksums = listOf(bucket("a", 10)),
168+
),
169+
),
170+
)
171+
turbine.expectProgress(0 to 10)
172+
173+
addDataLine("a", 5)
174+
turbine.expectProgress(5 to 10)
175+
176+
turbine.cancel()
177+
}
178+
179+
// Emulate the app closing
180+
database.close()
181+
syncLines.close()
182+
183+
// And reconnecting
184+
database = openDatabase()
185+
syncLines = Channel()
186+
database.connect(connector)
187+
188+
turbineScope {
189+
val turbine = database.currentStatus.asFlow().testIn(this)
190+
turbine.waitFor { it.connected && !it.downloading }
191+
192+
// Send the same checkpoint as before
193+
syncLines.send(
194+
SyncLine.FullCheckpoint(
195+
Checkpoint(
196+
lastOpId = "10",
197+
checksums = listOf(bucket("a", 10)),
198+
),
199+
),
200+
)
201+
202+
// Progress should be restored: 5 / 10 instead of 0/5
203+
turbine.expectProgress(5 to 10)
204+
205+
addDataLine("a", 5)
206+
turbine.expectProgress(10 to 10)
207+
addCheckpointComplete()
208+
turbine.expectNotDownloading()
209+
210+
turbine.cancel()
211+
}
212+
213+
database.close()
214+
syncLines.close()
215+
}
216+
217+
@Test
218+
fun interruptedSyncWithNewCheckpoint() =
219+
databaseTest {
220+
database.connect(connector)
221+
222+
turbineScope {
223+
val turbine = database.currentStatus.asFlow().testIn(this)
224+
turbine.waitFor { it.connected && !it.downloading }
225+
syncLines.send(
226+
SyncLine.FullCheckpoint(
227+
Checkpoint(
228+
lastOpId = "10",
229+
checksums = listOf(bucket("a", 10)),
230+
),
231+
),
232+
)
233+
turbine.expectProgress(0 to 10)
234+
235+
addDataLine("a", 5)
236+
turbine.expectProgress(5 to 10)
237+
238+
turbine.cancel()
239+
}
240+
241+
// Close and re-connect
242+
database.close()
243+
syncLines.close()
244+
database = openDatabase()
245+
syncLines = Channel()
246+
database.connect(connector)
247+
248+
turbineScope {
249+
val turbine = database.currentStatus.asFlow().testIn(this)
250+
turbine.waitFor { it.connected && !it.downloading }
251+
252+
// Send checkpoint with two more ops
253+
syncLines.send(
254+
SyncLine.FullCheckpoint(
255+
Checkpoint(
256+
lastOpId = "12",
257+
checksums = listOf(bucket("a", 12)),
258+
),
259+
),
260+
)
261+
262+
turbine.expectProgress(5 to 12)
263+
264+
addDataLine("a", 7)
265+
turbine.expectProgress(12 to 12)
266+
addCheckpointComplete()
267+
turbine.expectNotDownloading()
268+
269+
turbine.cancel()
270+
}
271+
272+
database.close()
273+
syncLines.close()
274+
}
275+
276+
@Test
277+
fun differentPriorities() =
278+
databaseTest {
279+
database.connect(connector)
280+
281+
turbineScope {
282+
val turbine = database.currentStatus.asFlow().testIn(this)
283+
turbine.waitFor { it.connected && !it.downloading }
284+
285+
suspend fun expectProgress(
286+
prio0: Pair<Int, Int>,
287+
prio2: Pair<Int, Int>,
288+
) {
289+
turbine.expectProgress(prio2, mapOf(BucketPriority(0) to prio0, BucketPriority(2) to prio2))
290+
}
291+
292+
syncLines.send(
293+
SyncLine.FullCheckpoint(
294+
Checkpoint(
295+
lastOpId = "10",
296+
checksums =
297+
listOf(
298+
bucket("a", 5, BucketPriority(0)),
299+
bucket("b", 5, BucketPriority(2)),
300+
),
301+
),
302+
),
303+
)
304+
expectProgress(0 to 5, 0 to 10)
305+
306+
addDataLine("a", 5)
307+
expectProgress(5 to 5, 5 to 10)
308+
309+
addCheckpointComplete(BucketPriority(0))
310+
expectProgress(5 to 5, 5 to 10)
311+
312+
addDataLine("b", 2)
313+
expectProgress(5 to 5, 7 to 10)
314+
315+
// Before syncing b fully, send a new checkpoint
316+
syncLines.send(
317+
SyncLine.CheckpointDiff(
318+
lastOpId = "14",
319+
updatedBuckets =
320+
listOf(
321+
bucket("a", 8, BucketPriority(0)),
322+
bucket("b", 6, BucketPriority(2)),
323+
),
324+
removedBuckets = emptyList(),
325+
),
326+
)
327+
expectProgress(5 to 8, 7 to 14)
328+
329+
addDataLine("a", 3)
330+
expectProgress(8 to 8, 10 to 14)
331+
addDataLine("b", 4)
332+
expectProgress(8 to 8, 14 to 14)
333+
334+
addCheckpointComplete()
335+
turbine.expectNotDownloading()
336+
337+
turbine.cancel()
338+
}
339+
340+
database.close()
341+
syncLines.close()
342+
}
343+
}

core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ internal class ActiveDatabaseTest(
8282
),
8383
)
8484

85-
val syncLines = Channel<SyncLine>()
85+
var syncLines = Channel<SyncLine>()
8686
var checkpointResponse: () -> WriteCheckpointResponse = {
8787
WriteCheckpointResponse(WriteCheckpointData("1000"))
8888
}

core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt

+2
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ internal interface BucketStorage {
2929

3030
suspend fun getBucketStates(): List<BucketState>
3131

32+
suspend fun getBucketOperationProgress(): Map<String, LocalOperationCounters>
33+
3234
suspend fun removeBuckets(bucketsToDelete: List<String>)
3335

3436
suspend fun hasCompletedSync(): Boolean

0 commit comments

Comments
 (0)