Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor csr logic to context source caller #1311

Merged
merged 8 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions search-service/config/detekt/baseline.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
<ID>LongMethod:EnabledAuthorizationServiceTests.kt$EnabledAuthorizationServiceTests$@Test fun `it should return serialized access control entities with other rigths if user is owner`()</ID>
<ID>LongMethod:EntityAccessControlHandler.kt$EntityAccessControlHandler$@PostMapping("/{subjectId}/attrs", consumes = [MediaType.APPLICATION_JSON_VALUE, JSON_LD_CONTENT_TYPE]) suspend fun addRightsOnEntities( @RequestHeader httpHeaders: HttpHeaders, @PathVariable subjectId: String, @RequestBody requestBody: Mono&lt;String&gt;, @AllowedParameters @RequestParam queryParams: MultiValueMap&lt;String, String&gt; ): ResponseEntity&lt;*&gt;</ID>
<ID>LongMethod:EntityEventService.kt$EntityEventService$private fun publishAttributeChangeEvent( sub: String?, tenantName: String, entityId: URI, entityTypesAndPayload: Pair&lt;List&lt;ExpandedTerm&gt;, String&gt;, attributeOperationResult: SucceededAttributeOperationResult )</ID>
<ID>LongMethod:EntityHandler.kt$EntityHandler$@GetMapping("/{entityId}", produces = [APPLICATION_JSON_VALUE, JSON_LD_CONTENT_TYPE, GEO_JSON_CONTENT_TYPE]) suspend fun getByURI( @RequestHeader httpHeaders: HttpHeaders, @PathVariable entityId: URI, @AllowedParameters( implemented = [ QP.OPTIONS, QP.FORMAT, QP.TYPE, QP.ATTRS, QP.GEOMETRY_PROPERTY, QP.LANG, QP.CONTAINED_BY, QP.JOIN, QP.JOIN_LEVEL, QP.DATASET_ID, ], notImplemented = [QP.PICK, QP.OMIT, QP.ENTITY_MAP, QP.LOCAL, QP.VIA] ) @RequestParam queryParams: MultiValueMap&lt;String, String&gt; ): ResponseEntity&lt;*&gt;</ID>
<ID>LongMethod:EntityHandler.kt$EntityHandler$@GetMapping(produces = [APPLICATION_JSON_VALUE, JSON_LD_CONTENT_TYPE, GEO_JSON_CONTENT_TYPE]) suspend fun getEntities( @RequestHeader httpHeaders: HttpHeaders, @AllowedParameters( implemented = [ QP.OPTIONS, QP.FORMAT, QP.COUNT, QP.OFFSET, QP.LIMIT, QP.ID, QP.TYPE, QP.ID_PATTERN, QP.ATTRS, QP.Q, QP.GEOMETRY, QP.GEOREL, QP.COORDINATES, QP.GEOPROPERTY, QP.GEOMETRY_PROPERTY, QP.LANG, QP.SCOPEQ, QP.CONTAINED_BY, QP.JOIN, QP.JOIN_LEVEL, QP.DATASET_ID, ], notImplemented = [QP.PICK, QP.OMIT, QP.EXPAND_VALUES, QP.CSF, QP.ENTITY_MAP, QP.LOCAL, QP.VIA] ) @RequestParam queryParams: MultiValueMap&lt;String, String&gt; ): ResponseEntity&lt;*&gt;</ID>
<ID>LongMethod:LinkedEntityServiceTests.kt$LinkedEntityServiceTests$@Test fun `it should inline entities up to the asked 2nd level`()</ID>
<ID>LongMethod:PatchAttributeTests.kt$PatchAttributeTests.Companion$@JvmStatic fun mergePatchProvider(): Stream&lt;Arguments&gt;</ID>
<ID>LongMethod:PatchAttributeTests.kt$PatchAttributeTests.Companion$@JvmStatic fun partialUpdatePatchProvider(): Stream&lt;Arguments&gt;</ID>
Expand All @@ -29,6 +27,8 @@
<ID>LongParameterList:EntityAttributeService.kt$EntityAttributeService$( entityUri: URI, ngsiLdAttributes: List&lt;NgsiLdAttribute&gt;, expandedAttributes: ExpandedAttributes, disallowOverwrite: Boolean, createdAt: ZonedDateTime, sub: Sub? )</ID>
<ID>LongParameterList:TemporalEntityHandler.kt$TemporalEntityHandler$( @RequestHeader httpHeaders: HttpHeaders, @PathVariable entityId: URI, @PathVariable attrId: String, @PathVariable instanceId: URI, @RequestBody requestBody: Mono&lt;String&gt;, @AllowedParameters(notImplemented = [QP.LOCAL, QP.VIA]) @RequestParam queryParams: MultiValueMap&lt;String, String&gt; )</ID>
<ID>LongParameterList:V0_29__JsonLd_migration.kt$V0_29__JsonLd_migration$( entityId: URI, attributeName: ExpandedTerm, datasetId: URI?, attributePayload: ExpandedAttributeInstance, ngsiLdAttributeInstance: NgsiLdAttributeInstance, defaultCreatedAt: ZonedDateTime )</ID>
<ID>MaxLineLength:DistributedEntityConsumptionServiceTests.kt$DistributedEntityConsumptionServiceTests$fun</ID>
<ID>MaximumLineLength:DistributedEntityConsumptionServiceTests.kt$DistributedEntityConsumptionServiceTests$</ID>
<ID>NestedBlockDepth:V0_29__JsonLd_migration.kt$V0_29__JsonLd_migration$override fun migrate(context: Context)</ID>
<ID>SwallowedException:TemporalQueryUtils.kt$e: IllegalArgumentException</ID>
</CurrentIssues>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@ import arrow.core.getOrNone
import arrow.core.left
import arrow.core.raise.either
import arrow.core.right
import arrow.core.separateEither
import arrow.fx.coroutines.parMap
import com.egm.stellio.search.csr.model.CSRFilters
import com.egm.stellio.search.csr.model.ContextSourceRegistration
import com.egm.stellio.search.csr.model.MiscellaneousPersistentWarning
import com.egm.stellio.search.csr.model.MiscellaneousWarning
import com.egm.stellio.search.csr.model.NGSILDWarning
import com.egm.stellio.search.csr.model.Operation
import com.egm.stellio.search.csr.model.RevalidationFailedWarning
import com.egm.stellio.search.entity.model.EntitiesQueryFromGet
import com.egm.stellio.shared.model.CompactedEntity
import com.egm.stellio.shared.queryparameter.QueryParameter
import com.egm.stellio.shared.util.JsonUtils.deserializeAsList
Expand All @@ -20,6 +25,7 @@ import org.slf4j.LoggerFactory
import org.springframework.http.HttpHeaders
import org.springframework.http.HttpMethod
import org.springframework.http.HttpStatus
import org.springframework.stereotype.Service
import org.springframework.util.CollectionUtils
import org.springframework.util.MultiValueMap
import org.springframework.web.reactive.function.client.ClientResponse
Expand All @@ -30,10 +36,48 @@ import java.net.URI

typealias QueryEntitiesResponse = Pair<List<CompactedEntity>, Int?>

object ContextSourceCaller {
@Service
class DistributedEntityConsumptionService(
private val contextSourceRegistrationService: ContextSourceRegistrationService,
) {

val logger: Logger = LoggerFactory.getLogger(javaClass)

suspend fun retrieveContextSourceEntity(
suspend fun distributeRetrieveEntityOperation(
id: URI,
httpHeaders: HttpHeaders,
queryParams: MultiValueMap<String, String>
): Pair<List<NGSILDWarning>, List<CompactedEntityWithCSR>> {
val csrFilters =
CSRFilters(
ids = setOf(id),
operations = listOf(
Operation.RETRIEVE_ENTITY,
Operation.FEDERATION_OPS,
Operation.RETRIEVE_OPS,
Operation.REDIRECTION_OPS
)
)

val matchingCSR = contextSourceRegistrationService.getContextSourceRegistrations(csrFilters)

// we can add parMap(concurrency = X) if this trigger too much http connexion at the same time
return matchingCSR.parMap { csr ->
val response = retrieveEntityFromContextSource(
httpHeaders,
csr,
id,
queryParams
)
contextSourceRegistrationService.updateContextSourceStatus(csr, response.isRight())
response.map { it?.let { it to csr } }
}.separateEither()
.let { (warnings, maybeResponses) ->
warnings.toMutableList() to maybeResponses.filterNotNull()
}
}

suspend fun retrieveEntityFromContextSource(
httpHeaders: HttpHeaders,
csr: ContextSourceRegistration,
id: URI,
Expand All @@ -55,7 +99,45 @@ object ContextSourceCaller {
)
}

suspend fun queryContextSourceEntities(
suspend fun distributeQueryEntitiesOperation(
entitiesQuery: EntitiesQueryFromGet,
httpHeaders: HttpHeaders,
queryParams: MultiValueMap<String, String>
): Triple<List<NGSILDWarning>, List<CompactedEntitiesWithCSR>, List<Int?>> {
val csrFilters =
CSRFilters(
ids = entitiesQuery.ids,
idPattern = entitiesQuery.idPattern,
typeSelection = entitiesQuery.typeSelection,
operations = listOf(
Operation.QUERY_ENTITY,
Operation.FEDERATION_OPS,
Operation.RETRIEVE_OPS,
Operation.REDIRECTION_OPS
)
)

val matchingCSR = contextSourceRegistrationService.getContextSourceRegistrations(csrFilters)

return matchingCSR.parMap { csr ->
val response = queryEntitiesFromContextSource(
httpHeaders,
csr,
queryParams
)
contextSourceRegistrationService.updateContextSourceStatus(csr, response.isRight())
response.map { (entities, count) -> Triple(entities, csr, count) }
}.separateEither()
.let { (warnings, response) ->
Triple(
warnings.toMutableList(),
response.map { (entities, csr, _) -> entities to csr },
response.map { (_, _, counts) -> counts }
)
}
}

suspend fun queryEntitiesFromContextSource(
httpHeaders: HttpHeaders,
csr: ContextSourceRegistration,
params: MultiValueMap<String, String>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,9 @@ import arrow.core.getOrElse
import arrow.core.left
thomasBousselin marked this conversation as resolved.
Show resolved Hide resolved
import arrow.core.raise.either
import arrow.core.right
import arrow.core.separateEither
import arrow.fx.coroutines.parMap
import com.egm.stellio.search.csr.model.CSRFilters
import com.egm.stellio.search.csr.model.Operation
import com.egm.stellio.search.csr.model.addWarnings
import com.egm.stellio.search.csr.service.ContextSourceCaller
import com.egm.stellio.search.csr.service.ContextSourceRegistrationService
import com.egm.stellio.search.csr.service.ContextSourceUtils
import com.egm.stellio.search.csr.service.DistributedEntityConsumptionService
import com.egm.stellio.search.entity.service.EntityQueryService
import com.egm.stellio.search.entity.service.EntityService
import com.egm.stellio.search.entity.service.LinkedEntityService
Expand Down Expand Up @@ -76,7 +71,7 @@ class EntityHandler(
private val applicationProperties: ApplicationProperties,
private val entityService: EntityService,
private val entityQueryService: EntityQueryService,
private val contextSourceRegistrationService: ContextSourceRegistrationService,
private val distributedEntityConsumptionService: DistributedEntityConsumptionService,
private val linkedEntityService: LinkedEntityService
) : BaseHandler() {

Expand Down Expand Up @@ -206,28 +201,9 @@ class EntityHandler(
val sub = getSubFromSecurityContext()

val contexts = getContextFromLinkHeaderOrDefault(httpHeaders, applicationProperties.contexts.core).bind()
val entitiesQuery = composeEntitiesQueryFromGet(
applicationProperties.pagination,
queryParams,
contexts
).bind()
val entitiesQuery = composeEntitiesQueryFromGet(applicationProperties.pagination, queryParams, contexts).bind()
.validateMinimalQueryEntitiesParameters().bind()

val csrFilters =
CSRFilters(
ids = entitiesQuery.ids,
idPattern = entitiesQuery.idPattern,
typeSelection = entitiesQuery.typeSelection,
operations = listOf(
Operation.QUERY_ENTITY,
Operation.FEDERATION_OPS,
Operation.RETRIEVE_OPS,
Operation.REDIRECTION_OPS
)
)

val matchingCSR = contextSourceRegistrationService.getContextSourceRegistrations(csrFilters)

val (entities, localCount) = entityQueryService.queryEntities(entitiesQuery, sub.getOrNull()).bind()

val filteredEntities = entities.filterAttributes(entitiesQuery.attrs, entitiesQuery.datasetId)
Expand All @@ -237,31 +213,21 @@ class EntityHandler(
linkedEntityService.processLinkedEntities(it, entitiesQuery, sub.getOrNull()).bind()
}

val (warnings, remoteEntitiesWithCSR, remoteCounts) = matchingCSR.parMap { csr ->
val response = ContextSourceCaller.queryContextSourceEntities(
val (queryWarnings, remoteEntitiesWithCSR, remoteCounts) =
distributedEntityConsumptionService.distributeQueryEntitiesOperation(
entitiesQuery,
httpHeaders,
csr,
queryParams
)
contextSourceRegistrationService.updateContextSourceStatus(csr, response.isRight())
response.map { (entities, count) -> Triple(entities, csr, count) }
}.separateEither()
.let { (warnings, response) ->
Triple(
warnings.toMutableList(),
response.map { (entities, csr, _) -> entities to csr },
response.map { (_, _, counts) -> counts }
)
}

val maxCount = (remoteCounts + localCount).maxBy { it ?: 0 } ?: 0

val mergedEntities = ContextSourceUtils.mergeEntitiesLists(
val (warnings, mergedEntities) = ContextSourceUtils.mergeEntitiesLists(
localEntities,
remoteEntitiesWithCSR
).toPair().let { (mergeWarnings, mergedEntities) ->
mergeWarnings?.let { warnings.addAll(it) }
mergedEntities ?: emptyList()
val warnings = mergeWarnings?.let { queryWarnings + it } ?: queryWarnings
warnings to (mergedEntities ?: emptyList())
}

val ngsiLdDataRepresentation = parseRepresentations(queryParams, mediaType).bind()
Expand Down Expand Up @@ -305,19 +271,6 @@ class EntityHandler(
contexts
).bind()

val csrFilters =
CSRFilters(
ids = setOf(entityId),
operations = listOf(
Operation.RETRIEVE_ENTITY,
Operation.FEDERATION_OPS,
Operation.RETRIEVE_OPS,
Operation.REDIRECTION_OPS
)
)

val matchingCSR = contextSourceRegistrationService.getContextSourceRegistrations(csrFilters)

val localEntity = either {
val expandedEntity = entityQueryService.queryEntity(entityId, sub.getOrNull()).bind()
expandedEntity.checkContainsAnyOf(entitiesQuery.attrs).bind()
Expand All @@ -328,20 +281,11 @@ class EntityHandler(
compactEntity(filteredExpandedEntity, contexts)
}

// we can add parMap(concurrency = X) if this trigger too much http connexion at the same time
val (warnings, remoteEntitiesWithCSR) = matchingCSR.parMap { csr ->
val response = ContextSourceCaller.retrieveContextSourceEntity(
httpHeaders,
csr,
entityId,
queryParams
)
contextSourceRegistrationService.updateContextSourceStatus(csr, response.isRight())
response.map { it?.let { it to csr } }
}.separateEither()
.let { (warnings, maybeResponses) ->
warnings.toMutableList() to maybeResponses.filterNotNull()
}
val (warnings, remoteEntitiesWithCSR) = distributedEntityConsumptionService.distributeRetrieveEntityOperation(
entityId,
httpHeaders,
queryParams
).let { (warnings, it) -> warnings.toMutableList() to it }

val (mergeWarnings, mergedEntity) = ContextSourceUtils.mergeEntities(
localEntity.getOrNull(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.egm.stellio.search.authorization.web

import com.egm.stellio.search.authorization.service.AuthorizationService
import com.egm.stellio.search.common.config.SearchProperties
import com.egm.stellio.search.csr.service.ContextSourceRegistrationService
import com.egm.stellio.search.csr.service.DistributedEntityConsumptionService
import com.egm.stellio.search.entity.service.EntityEventService
import com.egm.stellio.search.entity.service.EntityQueryService
import com.egm.stellio.search.entity.service.EntityService
Expand Down Expand Up @@ -42,7 +42,7 @@ class AnonymousUserHandlerTests {
private lateinit var entityQueryService: EntityQueryService

@MockkBean
private lateinit var contextSourceRegistrationService: ContextSourceRegistrationService
private lateinit var distributedEntityConsumptionService: DistributedEntityConsumptionService

@MockkBean(relaxed = true)
private lateinit var linkedEntityService: LinkedEntityService
Expand Down
Loading
Loading