Skip to content

Commit

Permalink
CSR Implementation (#1232)
Browse files Browse the repository at this point in the history
* Revert "chore(deps): Bump org.springframework.boot from 3.3.4 to 3.3.5 (#1254)"

This reverts commit 5f67fc2.

* wip: csr crud

* fix: detekt new rules

* Feature/csr retrieve entity (#1246)

* wip: not tested merge entities function

* wip: not tested merge entities function

* feat: retrieve Entity Working

* feat: first error handling and fix pr comments

* feat: test for mergeEntity

* fix: working Where statement for CSR / add some tests on Query CSR / some typos and naming

* feat: fix merging returning lists and PR comments

* feat: fix merging returning lists and PR comments

* feat: updating

* feat: MR comment - clearer name - no warning impact in ApiException - merging return warnings

* feat: get warnings from merge

* feat: separate context source utils + work with sysAttrs

* feat: warning following rfc7234 + docker-compose for csr + fix tests

* feat: doc csr launch

* feat: doc csr launch

* chore: slight refactoring of the docker compose context source instance

* chore: misc typo, wording and naming

* feat: always call context source for normalized representation + PR comments

* feat: registrationName in CSR + Warning fixes

* WIP

* feat: filter on operations in request

* feat: move filterBuilding to service

* feat: only define csf in CSR Filters

* feat: check merge do not merge entity in error

* fix: detekt new rules

* fix: PR comments

* fix: test for csfFilter

* refacto: rename test

---------

Co-authored-by: Benoit Orihuela <[email protected]>

* refacto: rename handler package to web

---------

Co-authored-by: Benoit Orihuela <[email protected]>
  • Loading branch information
thomasBousselin and bobeal authored Nov 15, 2024
1 parent aa3454d commit 1bd270a
Show file tree
Hide file tree
Showing 37 changed files with 2,546 additions and 44 deletions.
15 changes: 15 additions & 0 deletions .context-source.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# use this file with : docker compose --env-file .env --env-file .context-source.env -p stellio-context-source up
API_GATEWAY_PORT=8090
KAFKA_PORT=29093
POSTGRES_PORT=5433
SEARCH_SERVICE_PORT=8093
SUBSCRIPTION_SERVICE_PORT=8094

# Used by subscription service when searching entities for recurring subscriptions
# (those defined with a timeInterval parameter)
SUBSCRIPTION_ENTITY_SERVICE_URL=http://search-service:8093

# Used as a base URL by subscription service when serving contexts for notifications
SUBSCRIPTION_STELLIO_URL=http://localhost:8090

CONTAINER_NAME_PREFIX=context-source-
2 changes: 2 additions & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,5 @@ APPLICATION_TENANTS_0_DBSCHEMA=public
APPLICATION_PAGINATION_LIMIT_DEFAULT=30
APPLICATION_PAGINATION_LIMIT_MAX=100
APPLICATION_PAGINATION_TEMPORAL_LIMIT=10000

CONTAINER_NAME_PREFIX=
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ class ApiGatewayApplication {
"/ngsi-ld/v1/types/**",
"/ngsi-ld/v1/attributes/**",
"/ngsi-ld/v1/temporal/entities/**",
"/ngsi-ld/v1/temporal/entityOperations/**"
"/ngsi-ld/v1/temporal/entityOperations/**",
"/ngsi-ld/v1/csourceRegistrations/**"
).uri("http://$searchServiceUrl:8083")
}
.route { p ->
Expand Down
23 changes: 13 additions & 10 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
# you can launch a second instance of Stellio with (for instance to use it as a context source)
# docker compose --env-file .env --env-file .context-source.env -p stellio-context-source up -d
services:
kafka:
image: confluentinc/cp-kafka:7.6.0
container_name: stellio-kafka
container_name: "${CONTAINER_NAME_PREFIX}stellio-kafka"
hostname: stellio-kafka
ports:
- "29092:29092"
- "${KAFKA_PORT:-29092}:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
Expand All @@ -20,15 +22,15 @@ services:
CLUSTER_ID: ZGE2MTQ4NDk4NGU3NDE2Mm
postgres:
image: stellio/stellio-timescale-postgis:16-2.16.0-3.3
container_name: stellio-postgres
container_name: "${CONTAINER_NAME_PREFIX}stellio-postgres"
environment:
- POSTGRES_USER=${POSTGRES_USER}
- POSTGRES_PASS=${POSTGRES_PASS}
- POSTGRES_DBNAME=${POSTGRES_DBNAME}
- POSTGRES_MULTIPLE_EXTENSIONS=postgis,timescaledb,pgcrypto
- ACCEPT_TIMESCALE_TUNING=TRUE
ports:
- "5432:5432"
- "${POSTGRES_PORT:-5432}:5432"
volumes:
- stellio-postgres-storage:/var/lib/postgresql
healthcheck:
Expand All @@ -38,14 +40,14 @@ services:
retries: 20
start_period: 10s
api-gateway:
container_name: stellio-api-gateway
container_name: "${CONTAINER_NAME_PREFIX}stellio-api-gateway"
image: stellio/stellio-api-gateway:${STELLIO_DOCKER_TAG}
environment:
- SPRING_PROFILES_ACTIVE=${ENVIRONMENT}
ports:
- "8080:8080"
- "${API_GATEWAY_PORT:-8080}:8080"
search-service:
container_name: stellio-search-service
container_name: "${CONTAINER_NAME_PREFIX}stellio-search-service"
image: stellio/stellio-search-service:${STELLIO_DOCKER_TAG}
environment:
- SPRING_PROFILES_ACTIVE=${ENVIRONMENT}
Expand All @@ -62,14 +64,14 @@ services:
- APPLICATION_PAGINATION_TEMPORAL-LIMIT=${APPLICATION_PAGINATION_TEMPORAL_LIMIT}

ports:
- "8083:8083"
- "${SEARCH_SERVICE_PORT:-8083}:8083"
depends_on:
postgres:
condition: service_healthy
kafka:
condition: service_started
subscription-service:
container_name: stellio-subscription-service
container_name: "${CONTAINER_NAME_PREFIX}stellio-subscription-service"
image: stellio/stellio-subscription-service:${STELLIO_DOCKER_TAG}
environment:
- SPRING_PROFILES_ACTIVE=${ENVIRONMENT}
Expand All @@ -86,7 +88,7 @@ services:
- APPLICATION_PAGINATION_LIMIT-DEFAULT=${APPLICATION_PAGINATION_LIMIT_DEFAULT}
- APPLICATION_PAGINATION_LIMIT-MAX=${APPLICATION_PAGINATION_LIMIT_MAX}
ports:
- "8084:8084"
- "${SUBSCRIPTION_SERVICE_PORT:-8084}:8084"
depends_on:
postgres:
condition: service_healthy
Expand All @@ -95,3 +97,4 @@ services:

volumes:
stellio-postgres-storage:
name: "${CONTAINER_NAME_PREFIX}stellio-postgres-storage"
1 change: 1 addition & 0 deletions search-service/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ dependencies {
runtimeOnly("org.postgresql:postgresql")
runtimeOnly("io.r2dbc:r2dbc-pool")

testImplementation("org.wiremock:wiremock-standalone:3.3.1")
testImplementation("org.testcontainers:postgresql")
testImplementation("org.testcontainers:kafka")
testImplementation("org.testcontainers:r2dbc")
Expand Down
1 change: 1 addition & 0 deletions search-service/config/detekt/baseline.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,6 @@
<ID>LongParameterList:V0_29__JsonLd_migration.kt$V0_29__JsonLd_migration$( entityId: URI, attributeName: ExpandedTerm, datasetId: URI?, attributePayload: ExpandedAttributeInstance, ngsiLdAttributeInstance: NgsiLdAttributeInstance, defaultCreatedAt: ZonedDateTime )</ID>
<ID>NestedBlockDepth:V0_29__JsonLd_migration.kt$V0_29__JsonLd_migration$override fun migrate(context: Context)</ID>
<ID>SwallowedException:TemporalQueryUtils.kt$e: IllegalArgumentException</ID>
<ID>TooGenericExceptionCaught:ContextSourceCaller.kt$ContextSourceCaller$e: Exception</ID>
</CurrentIssues>
</SmellBaseline>
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import arrow.core.raise.ensure
import com.egm.stellio.shared.model.APIException
import com.egm.stellio.shared.model.BadRequestDataException
import com.egm.stellio.shared.model.EntitySelector
import com.egm.stellio.shared.util.JsonUtils
import com.egm.stellio.shared.util.DataTypes

/**
* A Query data type as defined in 5.2.23.
Expand All @@ -29,7 +29,7 @@ data class Query private constructor(
companion object {
operator fun invoke(queryBody: String): Either<APIException, Query> = either {
runCatching {
JsonUtils.deserializeDataTypeAs<Query>(queryBody)
DataTypes.deserializeAs<Query>(queryBody)
}.fold(
{
ensure(it.type == "Query") {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.egm.stellio.search.csr.model

import java.net.URI

data class CSRFilters( // we should use a combination of EntitiesQuery TemporalQuery (when we implement all operations)
val ids: Set<URI> = emptySet(),
val csf: String? = null,
) {
constructor(ids: Set<URI> = emptySet(), operations: List<Operation>) :
this(
ids = ids,
csf = operations.joinToString("|") { "${ContextSourceRegistration::operations.name}==${it.key}" }
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
package com.egm.stellio.search.csr.model

import arrow.core.Either
import arrow.core.left
import arrow.core.raise.either
import arrow.core.right
import com.egm.stellio.shared.model.APIException
import com.egm.stellio.shared.model.BadRequestDataException
import com.egm.stellio.shared.model.toAPIException
import com.egm.stellio.shared.util.DataTypes
import com.egm.stellio.shared.util.JSON_LD_MEDIA_TYPE
import com.egm.stellio.shared.util.JsonLdUtils.JSONLD_CONTEXT
import com.egm.stellio.shared.util.JsonLdUtils.NGSILD_CSR_TERM
import com.egm.stellio.shared.util.JsonLdUtils.compactTerm
import com.egm.stellio.shared.util.JsonLdUtils.expandJsonLdTerm
import com.egm.stellio.shared.util.JsonUtils.deserializeAs
import com.egm.stellio.shared.util.JsonUtils.serializeObject
import com.egm.stellio.shared.util.invalidUriMessage
import com.egm.stellio.shared.util.ngsiLdDateTime
import com.egm.stellio.shared.util.toUri
import com.fasterxml.jackson.annotation.JsonFormat
import com.fasterxml.jackson.annotation.JsonProperty
import com.fasterxml.jackson.module.kotlin.convertValue
import org.springframework.http.MediaType
import java.net.URI
import java.time.ZonedDateTime
import java.util.UUID
import java.util.regex.Pattern

data class ContextSourceRegistration(
val id: URI = "urn:ngsi-ld:ContextSourceRegistration:${UUID.randomUUID()}".toUri(),
val endpoint: URI,
val registrationName: String? = null,
val type: String = NGSILD_CSR_TERM,
val mode: Mode = Mode.INCLUSIVE,
val information: List<RegistrationInfo> = emptyList(),
val operations: List<Operation> = listOf(Operation.FEDERATION_OPS),
val createdAt: ZonedDateTime = ngsiLdDateTime(),
val modifiedAt: ZonedDateTime? = null,
val observationInterval: TimeInterval? = null,
val managementInterval: TimeInterval? = null,
val status: StatusType? = null,
val timesSent: Int = 0,
val timesFailed: Int = 0,
val lastFailure: ZonedDateTime? = null,
val lastSuccess: ZonedDateTime? = null,
) {

fun isAuxiliary(): Boolean = mode == Mode.AUXILIARY

data class TimeInterval(
val start: ZonedDateTime,
val end: ZonedDateTime? = null
)

data class RegistrationInfo(
val entities: List<EntityInfo>?,
val propertyNames: List<String>?,
val relationshipNames: List<String>?
) {
fun expand(contexts: List<String>): RegistrationInfo =
RegistrationInfo(
entities = entities?.map { it.expand(contexts) },
propertyNames = propertyNames?.map { expandJsonLdTerm(it, contexts) },
relationshipNames = relationshipNames?.map { expandJsonLdTerm(it, contexts) },
)

fun compact(contexts: List<String>): RegistrationInfo =
RegistrationInfo(
entities = entities?.map { it.compact(contexts) },
propertyNames = propertyNames?.map { compactTerm(it, contexts) },
relationshipNames = relationshipNames?.map { compactTerm(it, contexts) }
)

fun validate(): Either<BadRequestDataException, Unit> = either {
return if (entities != null || propertyNames != null || relationshipNames != null) {
entities?.forEach { it.validate().bind() }
Unit.right()
} else {
BadRequestDataException("RegistrationInfo should have at least one element").left()
}
}
}

data class EntityInfo(
val id: URI? = null,
val idPattern: String? = null,
@JsonFormat(
with = [
JsonFormat.Feature.ACCEPT_SINGLE_VALUE_AS_ARRAY,
JsonFormat.Feature.WRITE_SINGLE_ELEM_ARRAYS_UNWRAPPED
]
)
val type: List<String>
) {
fun expand(contexts: List<String>): EntityInfo =
this.copy(
type = type.map { expandJsonLdTerm(it, contexts) },
)

fun compact(contexts: List<String>): EntityInfo =
this.copy(
type = type.map { compactTerm(it, contexts) },
)

fun validate(): Either<BadRequestDataException, Unit> {
val result = runCatching {
idPattern?.let { Pattern.compile(it) }
true
}.fold({ true }, { false })

return if (result)
Unit.right()
else BadRequestDataException("Invalid idPattern found in contextSourceRegistration").left()
}
}
fun expand(contexts: List<String>): ContextSourceRegistration =
this.copy(
information = information.map { it.expand(contexts) }
)

fun compact(contexts: List<String>): ContextSourceRegistration =
this.copy(
information = information.map { it.compact(contexts) }
)

fun serialize(
contexts: List<String>,
mediaType: MediaType = JSON_LD_MEDIA_TYPE,
includeSysAttrs: Boolean = false
): String {
return DataTypes.mapper.writeValueAsString(
DataTypes.mapper.convertValue<Map<String, Any>>(
this.compact(contexts)
).plus(
JSONLD_CONTEXT to contexts
).let { DataTypes.toFinalRepresentation(it, mediaType, includeSysAttrs) }
)
}

fun validate() = either {
checkTypeIsContextSourceRegistration().bind()
checkIdIsValid().bind()
information.map { it.validate().bind() }
}

private fun checkTypeIsContextSourceRegistration(): Either<APIException, Unit> =
if (type != NGSILD_CSR_TERM)
BadRequestDataException("type attribute must be equal to 'ContextSourceRegistration'").left()
else Unit.right()

private fun checkIdIsValid(): Either<APIException, Unit> =
if (!id.isAbsolute)
BadRequestDataException(invalidUriMessage("$id")).left()
else Unit.right()
companion object {

fun deserialize(
input: Map<String, Any>,
contexts: List<String>
): Either<APIException, ContextSourceRegistration> =
runCatching {
deserializeAs<ContextSourceRegistration>(serializeObject(input.plus(JSONLD_CONTEXT to contexts)))
.expand(contexts)
}.fold(
{ it.right() },
{ it.toAPIException("Failed to parse CSourceRegistration caused by :\n ${it.message}").left() }
)

fun notFoundMessage(id: URI) = "Could not find a CSourceRegistration with id $id"
fun alreadyExistsMessage(id: URI) = "A CSourceRegistration with id $id already exists"
fun unauthorizedMessage(id: URI) = "User is not authorized to access CSourceRegistration $id"
}

enum class StatusType(val status: String) {
@JsonProperty("ok")
OK("ok"),

@JsonProperty("failed")
FAILED("failed")
}
}

fun List<ContextSourceRegistration>.serialize(
contexts: List<String>,
mediaType: MediaType = JSON_LD_MEDIA_TYPE,
includeSysAttrs: Boolean = false
): String = this.map {
it.serialize(contexts, mediaType, includeSysAttrs)
}.toString()
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.egm.stellio.search.csr.model

import com.fasterxml.jackson.annotation.JsonProperty

enum class Mode(val key: String) {
@JsonProperty("inclusive")
INCLUSIVE("inclusive"),

@JsonProperty("exclusive")
EXCLUSIVE("exclusive"),

@JsonProperty("redirect")
REDIRECT("redirect"),

@JsonProperty("auxiliary")
AUXILIARY("auxiliary");
companion object {
fun fromString(mode: String?): Mode =
Mode.entries.firstOrNull { it.key == mode } ?: INCLUSIVE
}
}
Loading

0 comments on commit 1bd270a

Please sign in to comment.