Skip to content

Commit

Permalink
Add ResourceConsolidator (#2137)
Browse files Browse the repository at this point in the history
* Add ResourceConsolidator

* Add ResourceConsolidator only with no upload mode

* only keep one consolidator

* spotless

* add kdocs

* update docos
  • Loading branch information
omarismail94 authored Sep 14, 2023
1 parent f949093 commit 0bbdeb9
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,11 @@ import com.google.android.fhir.search.count
import com.google.android.fhir.search.execute
import com.google.android.fhir.sync.ConflictResolver
import com.google.android.fhir.sync.Resolved
import com.google.android.fhir.sync.upload.DefaultResourceConsolidator
import java.time.OffsetDateTime
import kotlinx.coroutines.flow.Flow
import org.hl7.fhir.r4.model.Bundle
import org.hl7.fhir.r4.model.Resource
import org.hl7.fhir.r4.model.ResourceType
import timber.log.Timber

/** Implementation of [FhirEngine]. */
internal class FhirEngineImpl(private val database: Database, private val context: Context) :
Expand Down Expand Up @@ -81,15 +80,15 @@ internal class FhirEngineImpl(private val database: Database, private val contex

override suspend fun syncDownload(
conflictResolver: ConflictResolver,
download: suspend () -> Flow<List<Resource>>
download: suspend () -> Flow<List<Resource>>,
) {
download().collect { resources ->
database.withTransaction {
val resolved =
resolveConflictingResources(
resources,
getConflictingResourceIds(resources),
conflictResolver
conflictResolver,
)
database.insertSyncedResources(resources)
saveResolvedResourcesToDatabase(resolved)
Expand All @@ -107,7 +106,7 @@ internal class FhirEngineImpl(private val database: Database, private val contex
private suspend fun resolveConflictingResources(
resources: List<Resource>,
conflictingResourceIds: Set<String>,
conflictResolver: ConflictResolver
conflictResolver: ConflictResolver,
) =
resources
.filter { conflictingResourceIds.contains(it.logicalId) }
Expand All @@ -123,89 +122,12 @@ internal class FhirEngineImpl(private val database: Database, private val contex
.intersect(database.getAllLocalChanges().map { it.resourceId }.toSet())

override suspend fun syncUpload(
upload: suspend (List<LocalChange>) -> Flow<Pair<LocalChangeToken, Resource>>
upload: suspend (List<LocalChange>) -> Flow<Pair<LocalChangeToken, Resource>>,
) {
val resourceConsolidator = DefaultResourceConsolidator(database)
val localChanges = database.getAllLocalChanges()
if (localChanges.isNotEmpty()) {
upload(localChanges).collect {
database.deleteUpdates(it.first)
when (it.second) {
is Bundle -> updateVersionIdAndLastUpdated(it.second as Bundle)
else -> updateVersionIdAndLastUpdated(it.second)
}
}
upload(localChanges).collect { resourceConsolidator.consolidate(it.first, it.second) }
}
}

private suspend fun updateVersionIdAndLastUpdated(bundle: Bundle) {
when (bundle.type) {
Bundle.BundleType.TRANSACTIONRESPONSE -> {
bundle.entry.forEach {
when {
it.hasResource() -> updateVersionIdAndLastUpdated(it.resource)
it.hasResponse() -> updateVersionIdAndLastUpdated(it.response)
}
}
}
else -> {
// Leave it for now.
Timber.i("Received request to update meta values for ${bundle.type}")
}
}
}

private suspend fun updateVersionIdAndLastUpdated(response: Bundle.BundleEntryResponseComponent) {
if (response.hasEtag() && response.hasLastModified() && response.hasLocation()) {
response.resourceIdAndType?.let { (id, type) ->
database.updateVersionIdAndLastUpdated(
id,
type,
getVersionFromETag(response.etag),
response.lastModified.toInstant()
)
}
}
}

private suspend fun updateVersionIdAndLastUpdated(resource: Resource) {
if (resource.hasMeta() && resource.meta.hasVersionId() && resource.meta.hasLastUpdated()) {
database.updateVersionIdAndLastUpdated(
resource.id,
resource.resourceType,
resource.meta.versionId,
resource.meta.lastUpdated.toInstant()
)
}
}

/**
* FHIR uses weak ETag that look something like W/"MTY4NDMyODE2OTg3NDUyNTAwMA", so we need to
* extract version from it. See https://hl7.org/fhir/http.html#Http-Headers.
*/
private fun getVersionFromETag(eTag: String) =
// The server should always return a weak etag that starts with W, but if it server returns a
// strong tag, we store it as-is. The http-headers for conditional upload like if-match will
// always add value as a weak tag.
if (eTag.startsWith("W/")) {
eTag.split("\"")[1]
} else {
eTag
}

/**
* May return a Pair of versionId and resource type extracted from the
* [Bundle.BundleEntryResponseComponent.location].
*
* [Bundle.BundleEntryResponseComponent.location] may be:
*
* 1. absolute path: `<server-path>/<resource-type>/<resource-id>/_history/<version>`
*
* 2. relative path: `<resource-type>/<resource-id>/_history/<version>`
*/
private val Bundle.BundleEntryResponseComponent.resourceIdAndType: Pair<String, ResourceType>?
get() =
location
?.split("/")
?.takeIf { it.size > 3 }
?.let { it[it.size - 3] to ResourceType.fromCode(it[it.size - 4]) }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.android.fhir.sync.upload

import com.google.android.fhir.LocalChangeToken
import com.google.android.fhir.db.Database
import org.hl7.fhir.r4.model.Bundle
import org.hl7.fhir.r4.model.Resource
import org.hl7.fhir.r4.model.ResourceType
import timber.log.Timber

/**
* Represents a mechanism to consolidate resources after they are uploaded.
*
* INTERNAL ONLY. This interface should NEVER been exposed as an external API because it works
* together with other components in the upload package to fulfill a specific upload strategy. After
* a resource is uploaded to a remote FHIR server and a response is returned, we need to consolidate
* any changes in the database, Examples of this would be, updating the lastUpdated timestamp field,
* or deleting the local change from the database, or updating the resource IDs and payloads to
* correspond with the server’s feedback.
*/
internal fun interface ResourceConsolidator {

/** Consolidates the local change token with the provided response from the FHIR server. */
suspend fun consolidate(localChangeToken: LocalChangeToken, response: Resource)
}

/** Default implementation of [ResourceConsolidator] that uses the database to aid consolidation. */
internal class DefaultResourceConsolidator(private val database: Database) : ResourceConsolidator {

override suspend fun consolidate(localChangeToken: LocalChangeToken, response: Resource) {
database.deleteUpdates(localChangeToken)
when (response) {
is Bundle -> updateVersionIdAndLastUpdated(response)
else -> updateVersionIdAndLastUpdated(response)
}
}

private suspend fun updateVersionIdAndLastUpdated(bundle: Bundle) {
when (bundle.type) {
Bundle.BundleType.TRANSACTIONRESPONSE -> {
bundle.entry.forEach {
when {
it.hasResource() -> updateVersionIdAndLastUpdated(it.resource)
it.hasResponse() -> updateVersionIdAndLastUpdated(it.response)
}
}
}
else -> {
// Leave it for now.
Timber.i("Received request to update meta values for ${bundle.type}")
}
}
}

private suspend fun updateVersionIdAndLastUpdated(response: Bundle.BundleEntryResponseComponent) {
if (response.hasEtag() && response.hasLastModified() && response.hasLocation()) {
response.resourceIdAndType?.let { (id, type) ->
database.updateVersionIdAndLastUpdated(
id,
type,
getVersionFromETag(response.etag),
response.lastModified.toInstant(),
)
}
}
}

private suspend fun updateVersionIdAndLastUpdated(resource: Resource) {
if (resource.hasMeta() && resource.meta.hasVersionId() && resource.meta.hasLastUpdated()) {
database.updateVersionIdAndLastUpdated(
resource.id,
resource.resourceType,
resource.meta.versionId,
resource.meta.lastUpdated.toInstant(),
)
}
}

/**
* FHIR uses weak ETag that look something like W/"MTY4NDMyODE2OTg3NDUyNTAwMA", so we need to
* extract version from it. See https://hl7.org/fhir/http.html#Http-Headers.
*/
private fun getVersionFromETag(eTag: String) =
// The server should always return a weak etag that starts with W, but if it server returns a
// strong tag, we store it as-is. The http-headers for conditional upload like if-match will
// always add value as a weak tag.
if (eTag.startsWith("W/")) {
eTag.split("\"")[1]
} else {
eTag
}

/**
* May return a Pair of versionId and resource type extracted from the
* [Bundle.BundleEntryResponseComponent.location].
*
* [Bundle.BundleEntryResponseComponent.location] may be:
* 1. absolute path: `<server-path>/<resource-type>/<resource-id>/_history/<version>`
* 2. relative path: `<resource-type>/<resource-id>/_history/<version>`
*/
private val Bundle.BundleEntryResponseComponent.resourceIdAndType: Pair<String, ResourceType>?
get() =
location
?.split("/")
?.takeIf { it.size > 3 }
?.let { it[it.size - 3] to ResourceType.fromCode(it[it.size - 4]) }
}

0 comments on commit 0bbdeb9

Please sign in to comment.