From 0bbdeb92ebcdf46507ca98a84f68737e63e2ff38 Mon Sep 17 00:00:00 2001 From: Omar Ismail <44980219+omarismail94@users.noreply.github.com> Date: Thu, 14 Sep 2023 15:30:43 +0100 Subject: [PATCH] Add ResourceConsolidator (#2137) * Add ResourceConsolidator * Add ResourceConsolidator only with no upload mode * only keep one consolidator * spotless * add kdocs * update docos --- .../android/fhir/impl/FhirEngineImpl.kt | 92 +------------ .../fhir/sync/upload/ResourceConsolidator.kt | 122 ++++++++++++++++++ 2 files changed, 129 insertions(+), 85 deletions(-) create mode 100644 engine/src/main/java/com/google/android/fhir/sync/upload/ResourceConsolidator.kt diff --git a/engine/src/main/java/com/google/android/fhir/impl/FhirEngineImpl.kt b/engine/src/main/java/com/google/android/fhir/impl/FhirEngineImpl.kt index 3d90ed3114..4c93efec9f 100644 --- a/engine/src/main/java/com/google/android/fhir/impl/FhirEngineImpl.kt +++ b/engine/src/main/java/com/google/android/fhir/impl/FhirEngineImpl.kt @@ -29,12 +29,11 @@ import com.google.android.fhir.search.count import com.google.android.fhir.search.execute import com.google.android.fhir.sync.ConflictResolver import com.google.android.fhir.sync.Resolved +import com.google.android.fhir.sync.upload.DefaultResourceConsolidator import java.time.OffsetDateTime import kotlinx.coroutines.flow.Flow -import org.hl7.fhir.r4.model.Bundle import org.hl7.fhir.r4.model.Resource import org.hl7.fhir.r4.model.ResourceType -import timber.log.Timber /** Implementation of [FhirEngine]. */ internal class FhirEngineImpl(private val database: Database, private val context: Context) : @@ -81,7 +80,7 @@ internal class FhirEngineImpl(private val database: Database, private val contex override suspend fun syncDownload( conflictResolver: ConflictResolver, - download: suspend () -> Flow> + download: suspend () -> Flow>, ) { download().collect { resources -> database.withTransaction { @@ -89,7 +88,7 @@ internal class FhirEngineImpl(private val database: Database, private val contex resolveConflictingResources( resources, getConflictingResourceIds(resources), - conflictResolver + conflictResolver, ) database.insertSyncedResources(resources) saveResolvedResourcesToDatabase(resolved) @@ -107,7 +106,7 @@ internal class FhirEngineImpl(private val database: Database, private val contex private suspend fun resolveConflictingResources( resources: List, conflictingResourceIds: Set, - conflictResolver: ConflictResolver + conflictResolver: ConflictResolver, ) = resources .filter { conflictingResourceIds.contains(it.logicalId) } @@ -123,89 +122,12 @@ internal class FhirEngineImpl(private val database: Database, private val contex .intersect(database.getAllLocalChanges().map { it.resourceId }.toSet()) override suspend fun syncUpload( - upload: suspend (List) -> Flow> + upload: suspend (List) -> Flow>, ) { + val resourceConsolidator = DefaultResourceConsolidator(database) val localChanges = database.getAllLocalChanges() if (localChanges.isNotEmpty()) { - upload(localChanges).collect { - database.deleteUpdates(it.first) - when (it.second) { - is Bundle -> updateVersionIdAndLastUpdated(it.second as Bundle) - else -> updateVersionIdAndLastUpdated(it.second) - } - } + upload(localChanges).collect { resourceConsolidator.consolidate(it.first, it.second) } } } - - private suspend fun updateVersionIdAndLastUpdated(bundle: Bundle) { - when (bundle.type) { - Bundle.BundleType.TRANSACTIONRESPONSE -> { - bundle.entry.forEach { - when { - it.hasResource() -> updateVersionIdAndLastUpdated(it.resource) - it.hasResponse() -> updateVersionIdAndLastUpdated(it.response) - } - } - } - else -> { - // Leave it for now. - Timber.i("Received request to update meta values for ${bundle.type}") - } - } - } - - private suspend fun updateVersionIdAndLastUpdated(response: Bundle.BundleEntryResponseComponent) { - if (response.hasEtag() && response.hasLastModified() && response.hasLocation()) { - response.resourceIdAndType?.let { (id, type) -> - database.updateVersionIdAndLastUpdated( - id, - type, - getVersionFromETag(response.etag), - response.lastModified.toInstant() - ) - } - } - } - - private suspend fun updateVersionIdAndLastUpdated(resource: Resource) { - if (resource.hasMeta() && resource.meta.hasVersionId() && resource.meta.hasLastUpdated()) { - database.updateVersionIdAndLastUpdated( - resource.id, - resource.resourceType, - resource.meta.versionId, - resource.meta.lastUpdated.toInstant() - ) - } - } - - /** - * FHIR uses weak ETag that look something like W/"MTY4NDMyODE2OTg3NDUyNTAwMA", so we need to - * extract version from it. See https://hl7.org/fhir/http.html#Http-Headers. - */ - private fun getVersionFromETag(eTag: String) = - // The server should always return a weak etag that starts with W, but if it server returns a - // strong tag, we store it as-is. The http-headers for conditional upload like if-match will - // always add value as a weak tag. - if (eTag.startsWith("W/")) { - eTag.split("\"")[1] - } else { - eTag - } - - /** - * May return a Pair of versionId and resource type extracted from the - * [Bundle.BundleEntryResponseComponent.location]. - * - * [Bundle.BundleEntryResponseComponent.location] may be: - * - * 1. absolute path: `///_history/` - * - * 2. relative path: `//_history/` - */ - private val Bundle.BundleEntryResponseComponent.resourceIdAndType: Pair? - get() = - location - ?.split("/") - ?.takeIf { it.size > 3 } - ?.let { it[it.size - 3] to ResourceType.fromCode(it[it.size - 4]) } } diff --git a/engine/src/main/java/com/google/android/fhir/sync/upload/ResourceConsolidator.kt b/engine/src/main/java/com/google/android/fhir/sync/upload/ResourceConsolidator.kt new file mode 100644 index 0000000000..d8f19fc5a2 --- /dev/null +++ b/engine/src/main/java/com/google/android/fhir/sync/upload/ResourceConsolidator.kt @@ -0,0 +1,122 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.android.fhir.sync.upload + +import com.google.android.fhir.LocalChangeToken +import com.google.android.fhir.db.Database +import org.hl7.fhir.r4.model.Bundle +import org.hl7.fhir.r4.model.Resource +import org.hl7.fhir.r4.model.ResourceType +import timber.log.Timber + +/** + * Represents a mechanism to consolidate resources after they are uploaded. + * + * INTERNAL ONLY. This interface should NEVER been exposed as an external API because it works + * together with other components in the upload package to fulfill a specific upload strategy. After + * a resource is uploaded to a remote FHIR server and a response is returned, we need to consolidate + * any changes in the database, Examples of this would be, updating the lastUpdated timestamp field, + * or deleting the local change from the database, or updating the resource IDs and payloads to + * correspond with the server’s feedback. + */ +internal fun interface ResourceConsolidator { + + /** Consolidates the local change token with the provided response from the FHIR server. */ + suspend fun consolidate(localChangeToken: LocalChangeToken, response: Resource) +} + +/** Default implementation of [ResourceConsolidator] that uses the database to aid consolidation. */ +internal class DefaultResourceConsolidator(private val database: Database) : ResourceConsolidator { + + override suspend fun consolidate(localChangeToken: LocalChangeToken, response: Resource) { + database.deleteUpdates(localChangeToken) + when (response) { + is Bundle -> updateVersionIdAndLastUpdated(response) + else -> updateVersionIdAndLastUpdated(response) + } + } + + private suspend fun updateVersionIdAndLastUpdated(bundle: Bundle) { + when (bundle.type) { + Bundle.BundleType.TRANSACTIONRESPONSE -> { + bundle.entry.forEach { + when { + it.hasResource() -> updateVersionIdAndLastUpdated(it.resource) + it.hasResponse() -> updateVersionIdAndLastUpdated(it.response) + } + } + } + else -> { + // Leave it for now. + Timber.i("Received request to update meta values for ${bundle.type}") + } + } + } + + private suspend fun updateVersionIdAndLastUpdated(response: Bundle.BundleEntryResponseComponent) { + if (response.hasEtag() && response.hasLastModified() && response.hasLocation()) { + response.resourceIdAndType?.let { (id, type) -> + database.updateVersionIdAndLastUpdated( + id, + type, + getVersionFromETag(response.etag), + response.lastModified.toInstant(), + ) + } + } + } + + private suspend fun updateVersionIdAndLastUpdated(resource: Resource) { + if (resource.hasMeta() && resource.meta.hasVersionId() && resource.meta.hasLastUpdated()) { + database.updateVersionIdAndLastUpdated( + resource.id, + resource.resourceType, + resource.meta.versionId, + resource.meta.lastUpdated.toInstant(), + ) + } + } + + /** + * FHIR uses weak ETag that look something like W/"MTY4NDMyODE2OTg3NDUyNTAwMA", so we need to + * extract version from it. See https://hl7.org/fhir/http.html#Http-Headers. + */ + private fun getVersionFromETag(eTag: String) = + // The server should always return a weak etag that starts with W, but if it server returns a + // strong tag, we store it as-is. The http-headers for conditional upload like if-match will + // always add value as a weak tag. + if (eTag.startsWith("W/")) { + eTag.split("\"")[1] + } else { + eTag + } + + /** + * May return a Pair of versionId and resource type extracted from the + * [Bundle.BundleEntryResponseComponent.location]. + * + * [Bundle.BundleEntryResponseComponent.location] may be: + * 1. absolute path: `///_history/` + * 2. relative path: `//_history/` + */ + private val Bundle.BundleEntryResponseComponent.resourceIdAndType: Pair? + get() = + location + ?.split("/") + ?.takeIf { it.size > 3 } + ?.let { it[it.size - 3] to ResourceType.fromCode(it[it.size - 4]) } +}