Skip to content

Commit

Permalink
Remove UploadWorkManager (#2166)
Browse files Browse the repository at this point in the history
* Remove UploadWorkManager

* Fix tests
  • Loading branch information
jingtang10 authored Sep 14, 2023
1 parent 66accf7 commit f949093
Show file tree
Hide file tree
Showing 16 changed files with 222 additions and 336 deletions.
9 changes: 4 additions & 5 deletions engine/src/main/java/com/google/android/fhir/sync/Config.kt
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ val defaultRetryConfiguration =
object SyncDataParams {
const val SORT_KEY = "_sort"
const val LAST_UPDATED_KEY = "_lastUpdated"
const val ADDRESS_COUNTRY_KEY = "address-country"
const val SUMMARY_KEY = "_summary"
const val SUMMARY_COUNT_VALUE = "count"
}
Expand All @@ -60,14 +59,14 @@ class PeriodicSyncConfiguration(
val repeat: RepeatInterval,

/** Configuration for synchronization retry */
val retryConfiguration: RetryConfiguration? = defaultRetryConfiguration
val retryConfiguration: RetryConfiguration? = defaultRetryConfiguration,
)

data class RepeatInterval(
/** The interval at which the sync should be triggered in */
val interval: Long,
/** The time unit for the repeat interval */
val timeUnit: TimeUnit
val timeUnit: TimeUnit,
)

fun ParamMap.concatParams(): String {
Expand All @@ -85,7 +84,7 @@ data class RetryConfiguration(
val backoffCriteria: BackoffCriteria,

/** Maximum retries for a failing [FhirSyncWorker] */
val maxRetries: Int
val maxRetries: Int,
)

/**
Expand All @@ -104,5 +103,5 @@ data class BackoffCriteria(
val backoffDelay: Long,

/** The time unit for [backoffDelay] */
val timeUnit: TimeUnit
val timeUnit: TimeUnit,
)
24 changes: 11 additions & 13 deletions engine/src/main/java/com/google/android/fhir/sync/FhirSyncWorker.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ import com.google.android.fhir.FhirEngine
import com.google.android.fhir.FhirEngineProvider
import com.google.android.fhir.OffsetDateTimeTypeAdapter
import com.google.android.fhir.sync.download.DownloaderImpl
import com.google.android.fhir.sync.upload.SquashedChangesUploadWorkManager
import com.google.android.fhir.sync.upload.UploadWorkManager
import com.google.android.fhir.sync.upload.UploaderImpl
import com.google.android.fhir.sync.upload.Uploader
import com.google.gson.ExclusionStrategy
import com.google.gson.FieldAttributes
import com.google.gson.GsonBuilder
Expand All @@ -43,10 +41,9 @@ import timber.log.Timber
abstract class FhirSyncWorker(appContext: Context, workerParams: WorkerParameters) :
CoroutineWorker(appContext, workerParams) {
abstract fun getFhirEngine(): FhirEngine

abstract fun getDownloadWorkManager(): DownloadWorkManager
private fun getUploadWorkManager(): UploadWorkManager {
return SquashedChangesUploadWorkManager()
}

abstract fun getConflictResolver(): ConflictResolver

private val gson =
Expand All @@ -64,9 +61,9 @@ abstract class FhirSyncWorker(appContext: Context, workerParams: WorkerParameter
?: return Result.failure(
buildWorkData(
IllegalStateException(
"FhirEngineConfiguration.ServerConfiguration is not set. Call FhirEngineProvider.init to initialize with appropriate configuration."
)
)
"FhirEngineConfiguration.ServerConfiguration is not set. Call FhirEngineProvider.init to initialize with appropriate configuration.",
),
),
)

val flow = MutableSharedFlow<SyncJobStatus>()
Expand All @@ -88,9 +85,9 @@ abstract class FhirSyncWorker(appContext: Context, workerParams: WorkerParameter
FhirSynchronizer(
applicationContext,
getFhirEngine(),
UploaderImpl(dataSource, getUploadWorkManager()),
Uploader(dataSource),
DownloaderImpl(dataSource, getDownloadWorkManager()),
getConflictResolver()
getConflictResolver(),
)
.apply { subscribe(flow) }
.synchronize()
Expand Down Expand Up @@ -125,7 +122,7 @@ abstract class FhirSyncWorker(appContext: Context, workerParams: WorkerParameter
return workDataOf(
// send serialized state and type so that consumer can convert it back
"StateType" to state::class.java.name,
"State" to gson.toJson(state)
"State" to gson.toJson(state),
)
}

Expand All @@ -137,8 +134,9 @@ abstract class FhirSyncWorker(appContext: Context, workerParams: WorkerParameter
* Exclusion strategy for [Gson] that handles field exclusions for [SyncJobStatus] returned by
* FhirSynchronizer. It should skip serializing the exceptions to avoid exceeding WorkManager
* WorkData limit
*
* @see <a
* href="https://github.com/google/android-fhir/issues/707">https://github.com/google/android-fhir/issues/707</a>
* href="https://github.com/google/android-fhir/issues/707">https://github.com/google/android-fhir/issues/707</a>
*/
internal class StateExclusionStrategy : ExclusionStrategy {
override fun shouldSkipField(field: FieldAttributes) = field.name.equals("exceptions")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package com.google.android.fhir.sync
import android.content.Context
import com.google.android.fhir.DatastoreUtil
import com.google.android.fhir.FhirEngine
import com.google.android.fhir.sync.download.DownloadState
import com.google.android.fhir.sync.download.Downloader
import com.google.android.fhir.sync.upload.UploadState
import com.google.android.fhir.sync.upload.Uploader
import java.time.OffsetDateTime
Expand All @@ -28,13 +30,14 @@ import org.hl7.fhir.r4.model.ResourceType

enum class SyncOperation {
DOWNLOAD,
UPLOAD
UPLOAD,
}

private sealed class SyncResult {
val timestamp: OffsetDateTime = OffsetDateTime.now()

class Success : SyncResult()

data class Error(val exceptions: List<ResourceSyncException>) : SyncResult()
}

Expand All @@ -46,7 +49,7 @@ internal class FhirSynchronizer(
private val fhirEngine: FhirEngine,
private val uploader: Uploader,
private val downloader: Downloader,
private val conflictResolver: ConflictResolver
private val conflictResolver: ConflictResolver,
) {
private var syncState: MutableSharedFlow<SyncJobStatus>? = null
private val datastoreUtil = DatastoreUtil(context)
Expand Down Expand Up @@ -135,7 +138,7 @@ internal class FhirSynchronizer(
is UploadState.Success ->
emit(result.localChangeToken to result.resource).also {
setSyncState(
SyncJobStatus.InProgress(SyncOperation.UPLOAD, result.total, result.completed)
SyncJobStatus.InProgress(SyncOperation.UPLOAD, result.total, result.completed),
)
}
is UploadState.Failure -> exceptions.add(result.syncError)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 Google LLC
* Copyright 2022-2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -14,8 +14,9 @@
* limitations under the License.
*/

package com.google.android.fhir.sync
package com.google.android.fhir.sync.download

import com.google.android.fhir.sync.ResourceSyncException
import kotlinx.coroutines.flow.Flow
import org.hl7.fhir.r4.model.Resource
import org.hl7.fhir.r4.model.ResourceType
Expand All @@ -24,7 +25,7 @@ import org.hl7.fhir.r4.model.ResourceType
internal interface Downloader {
/**
* @return Flow of the [DownloadState] which keeps emitting [Resource]s or Error based on the
* response of each page download request. It also updates progress if [ProgressCallback] exists
* response of each page download request. It also updates progress if [ProgressCallback] exists
*/
suspend fun download(): Flow<DownloadState>
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ package com.google.android.fhir.sync.download
import com.google.android.fhir.sync.BundleDownloadRequest
import com.google.android.fhir.sync.DataSource
import com.google.android.fhir.sync.DownloadRequest
import com.google.android.fhir.sync.DownloadState
import com.google.android.fhir.sync.DownloadWorkManager
import com.google.android.fhir.sync.Downloader
import com.google.android.fhir.sync.ResourceSyncException
import com.google.android.fhir.sync.UrlDownloadRequest
import kotlinx.coroutines.flow.Flow
Expand All @@ -38,7 +36,7 @@ import timber.log.Timber
*/
internal class DownloaderImpl(
private val dataSource: DataSource,
private val downloadWorkManager: DownloadWorkManager
private val downloadWorkManager: DownloadWorkManager,
) : Downloader {
private val resourceTypeList = ResourceType.values().map { it.name }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.hl7.fhir.r4.model.Resource
import org.hl7.fhir.r4.model.ResourceType

typealias ResourceSearchParams = Map<ResourceType, ParamMap>

/**
* [DownloadWorkManager] implementation based on the provided [ResourceSearchParams] to generate
* [Resource] search queries and parse [Bundle.BundleType.SEARCHSET] type [Bundle]. This
Expand All @@ -39,14 +40,15 @@ typealias ResourceSearchParams = Map<ResourceType, ParamMap>
*/
class ResourceParamsBasedDownloadWorkManager(
syncParams: ResourceSearchParams,
val context: TimestampContext
val context: TimestampContext,
) : DownloadWorkManager {
private val resourcesToDownloadWithSearchParams = LinkedList(syncParams.entries)
private val urlOfTheNextPagesToDownloadForAResource = LinkedList<String>()

override suspend fun getNextRequest(): DownloadRequest? {
if (urlOfTheNextPagesToDownloadForAResource.isNotEmpty())
if (urlOfTheNextPagesToDownloadForAResource.isNotEmpty()) {
return urlOfTheNextPagesToDownloadForAResource.poll()?.let { DownloadRequest.of(it) }
}

return resourcesToDownloadWithSearchParams.poll()?.let { (resourceType, params) ->
val newParams =
Expand Down Expand Up @@ -74,7 +76,7 @@ class ResourceParamsBasedDownloadWorkManager(
private suspend fun getLastUpdatedParam(
resourceType: ResourceType,
params: ParamMap,
context: TimestampContext
context: TimestampContext,
): MutableMap<String, String> {
val newParams = mutableMapOf<String, String>()
if (!params.containsKey(SyncDataParams.SORT_KEY)) {
Expand Down Expand Up @@ -108,20 +110,22 @@ class ResourceParamsBasedDownloadWorkManager(

response.link
.firstOrNull { component -> component.relation == "next" }
?.url?.let { next -> urlOfTheNextPagesToDownloadForAResource.add(next) }
?.url
?.let { next -> urlOfTheNextPagesToDownloadForAResource.add(next) }

return response.entry
.map { it.resource }
.also { resources ->
resources
.groupBy { it.resourceType }
.entries.map { map ->
.entries
.map { map ->
map.value
.filter { it.meta.lastUpdated != null }
.let {
context.saveLastUpdatedTimestamp(
map.key,
it.maxOfOrNull { it.meta.lastUpdated }?.toTimeZoneString()
it.maxOfOrNull { it.meta.lastUpdated }?.toTimeZoneString(),
)
}
}
Expand All @@ -130,6 +134,7 @@ class ResourceParamsBasedDownloadWorkManager(

interface TimestampContext {
suspend fun saveLastUpdatedTimestamp(resourceType: ResourceType, timestamp: String?)

suspend fun getLasUpdateTimestamp(resourceType: ResourceType): String?
}
}

This file was deleted.

This file was deleted.

Loading

0 comments on commit f949093

Please sign in to comment.