diff --git a/base/model/src/main/java/org/eclipse/ditto/base/model/headers/DittoHeaderDefinition.java b/base/model/src/main/java/org/eclipse/ditto/base/model/headers/DittoHeaderDefinition.java index 6313cf988d..882928eaab 100755 --- a/base/model/src/main/java/org/eclipse/ditto/base/model/headers/DittoHeaderDefinition.java +++ b/base/model/src/main/java/org/eclipse/ditto/base/model/headers/DittoHeaderDefinition.java @@ -527,6 +527,41 @@ public enum DittoHeaderDefinition implements HeaderDefinition { JsonObject.class, false, true, + HeaderValueValidators.getJsonObjectValidator()), + + /** + * Internal header containing the pre-defined configured {@code extraFields} as list of jsonPointers for the + * emitted thing event. + * + * @since 3.7.0 + */ + PRE_DEFINED_EXTRA_FIELDS("ditto-pre-defined-extra-fields", + JsonArray.class, + false, + false, + HeaderValueValidators.getJsonArrayValidator()), + + /** + * Internal header containing the pre-defined configured {@code extraFields} as keys and the allowed "read subjects" + * as array of stings - defining which "auth subjects" are allowed to read which pre-defined extra field. + * + * @since 3.7.0 + */ + PRE_DEFINED_EXTRA_FIELDS_READ_GRANT_OBJECT("ditto-pre-defined-extra-fields-read-grant", + JsonObject.class, + false, + false, + HeaderValueValidators.getJsonObjectValidator()), + + /** + * Internal header containing pre-defined {@code extraFields} as JSON object sent along for emitted thing event. + * + * @since 3.7.0 + */ + PRE_DEFINED_EXTRA_FIELDS_OBJECT("ditto-pre-defined-extra-fields-object", + JsonObject.class, + false, + false, HeaderValueValidators.getJsonObjectValidator()); /** diff --git a/base/model/src/test/java/org/eclipse/ditto/base/model/headers/ImmutableDittoHeadersTest.java b/base/model/src/test/java/org/eclipse/ditto/base/model/headers/ImmutableDittoHeadersTest.java index 5f8b115999..7a83049aae 100755 --- a/base/model/src/test/java/org/eclipse/ditto/base/model/headers/ImmutableDittoHeadersTest.java +++ b/base/model/src/test/java/org/eclipse/ditto/base/model/headers/ImmutableDittoHeadersTest.java @@ -137,6 +137,16 @@ public final class ImmutableDittoHeadersTest { .set(DittoHeaderDefinition.ORIGINATOR.getKey(), "foo:bar") .build(); + private static final JsonArray KNOWN_PRE_DEFINED_EXTRA_FIELDS = JsonArray.newBuilder() + .add("foo:bar:123") + .build(); + private static final JsonObject KNOWN_PRE_DEFINED_EXTRA_FIELDS_READ_GRANT_OBJECT = JsonObject.newBuilder() + .set("/definition", "known:subject") + .build(); + private static final JsonObject KNOWN_PRE_DEFINED_EXTRA_FIELDS_OBJECT = JsonObject.newBuilder() + .set("definition", "foo:bar:123") + .build(); + static { KNOWN_METADATA_HEADERS = MetadataHeaders.newInstance(); @@ -205,6 +215,12 @@ public void settingAllKnownHeadersWorksAsExpected() { .putHeader(DittoHeaderDefinition.AT_HISTORICAL_REVISION.getKey(), String.valueOf(KNOWN_AT_HISTORICAL_REVISION)) .putHeader(DittoHeaderDefinition.AT_HISTORICAL_TIMESTAMP.getKey(), String.valueOf(KNOWN_AT_HISTORICAL_TIMESTAMP)) .putHeader(DittoHeaderDefinition.HISTORICAL_HEADERS.getKey(), KNOWN_HISTORICAL_HEADERS.formatAsString()) + .putHeader(DittoHeaderDefinition.PRE_DEFINED_EXTRA_FIELDS.getKey(), + KNOWN_PRE_DEFINED_EXTRA_FIELDS.formatAsString()) + .putHeader(DittoHeaderDefinition.PRE_DEFINED_EXTRA_FIELDS_READ_GRANT_OBJECT.getKey(), + KNOWN_PRE_DEFINED_EXTRA_FIELDS_READ_GRANT_OBJECT.formatAsString()) + .putHeader(DittoHeaderDefinition.PRE_DEFINED_EXTRA_FIELDS_OBJECT.getKey(), + KNOWN_PRE_DEFINED_EXTRA_FIELDS_OBJECT.formatAsString()) .build(); assertThat(underTest).isEqualTo(expectedHeaderMap); @@ -535,6 +551,11 @@ public void toJsonReturnsExpected() { .set(DittoHeaderDefinition.AT_HISTORICAL_REVISION.getKey(), KNOWN_AT_HISTORICAL_REVISION) .set(DittoHeaderDefinition.AT_HISTORICAL_TIMESTAMP.getKey(), KNOWN_AT_HISTORICAL_TIMESTAMP.toString()) .set(DittoHeaderDefinition.HISTORICAL_HEADERS.getKey(), KNOWN_HISTORICAL_HEADERS) + .set(DittoHeaderDefinition.PRE_DEFINED_EXTRA_FIELDS.getKey(), KNOWN_PRE_DEFINED_EXTRA_FIELDS) + .set(DittoHeaderDefinition.PRE_DEFINED_EXTRA_FIELDS_READ_GRANT_OBJECT.getKey(), + KNOWN_PRE_DEFINED_EXTRA_FIELDS_READ_GRANT_OBJECT) + .set(DittoHeaderDefinition.PRE_DEFINED_EXTRA_FIELDS_OBJECT.getKey(), + KNOWN_PRE_DEFINED_EXTRA_FIELDS_OBJECT) .build(); final Map allKnownHeaders = createMapContainingAllKnownHeaders(); @@ -774,6 +795,12 @@ private static Map createMapContainingAllKnownHeaders() { result.put(DittoHeaderDefinition.AT_HISTORICAL_REVISION.getKey(), String.valueOf(KNOWN_AT_HISTORICAL_REVISION)); result.put(DittoHeaderDefinition.AT_HISTORICAL_TIMESTAMP.getKey(), String.valueOf(KNOWN_AT_HISTORICAL_TIMESTAMP)); result.put(DittoHeaderDefinition.HISTORICAL_HEADERS.getKey(), KNOWN_HISTORICAL_HEADERS.formatAsString()); + result.put(DittoHeaderDefinition.PRE_DEFINED_EXTRA_FIELDS.getKey(), + KNOWN_PRE_DEFINED_EXTRA_FIELDS.formatAsString()); + result.put(DittoHeaderDefinition.PRE_DEFINED_EXTRA_FIELDS_READ_GRANT_OBJECT.getKey(), + KNOWN_PRE_DEFINED_EXTRA_FIELDS_READ_GRANT_OBJECT.formatAsString()); + result.put(DittoHeaderDefinition.PRE_DEFINED_EXTRA_FIELDS_OBJECT.getKey(), + KNOWN_PRE_DEFINED_EXTRA_FIELDS_OBJECT.formatAsString()); return result; } diff --git a/internal/models/signalenrichment/src/main/java/org/eclipse/ditto/internal/models/signalenrichment/DittoCachingSignalEnrichmentFacade.java b/internal/models/signalenrichment/src/main/java/org/eclipse/ditto/internal/models/signalenrichment/DittoCachingSignalEnrichmentFacade.java index fc37e2586d..effaad3df4 100644 --- a/internal/models/signalenrichment/src/main/java/org/eclipse/ditto/internal/models/signalenrichment/DittoCachingSignalEnrichmentFacade.java +++ b/internal/models/signalenrichment/src/main/java/org/eclipse/ditto/internal/models/signalenrichment/DittoCachingSignalEnrichmentFacade.java @@ -14,14 +14,17 @@ import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull; +import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import javax.annotation.Nullable; @@ -38,8 +41,10 @@ import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLogger; import org.eclipse.ditto.internal.utils.tracing.DittoTracing; import org.eclipse.ditto.internal.utils.tracing.span.SpanOperationName; +import org.eclipse.ditto.json.JsonArray; import org.eclipse.ditto.json.JsonFactory; import org.eclipse.ditto.json.JsonFieldSelector; +import org.eclipse.ditto.json.JsonKey; import org.eclipse.ditto.json.JsonObject; import org.eclipse.ditto.json.JsonObjectBuilder; import org.eclipse.ditto.json.JsonPointer; @@ -160,6 +165,16 @@ public CompletionStage retrievePartialThing(final ThingId thingId, (concernedSignal instanceof ThingEvent) && !(ProtocolAdapter.isLiveSignal(concernedSignal)) ? List.of((ThingEvent) concernedSignal) : List.of(); + final DittoHeaders signalHeaders = Optional.ofNullable(concernedSignal) + .map(Signal::getDittoHeaders) + .orElseGet(DittoHeaders::empty); + if (jsonFieldSelector != null && + signalHeaders.containsKey(DittoHeaderDefinition.PRE_DEFINED_EXTRA_FIELDS_OBJECT.getKey()) + ) { + return performPreDefinedExtraFieldsOptimization( + thingId, jsonFieldSelector, dittoHeaders, signalHeaders, thingEvents + ); + } // as second step only return what was originally requested as fields: final var cachingParameters = new CachingParameters(jsonFieldSelector, thingEvents, true, 0); @@ -199,6 +214,81 @@ public CompletionStage retrievePartialThing(final EntityId thingId, .thenApply(jsonObject -> applyJsonFieldSelector(jsonObject, jsonFieldSelector)); } + private CompletionStage performPreDefinedExtraFieldsOptimization(final ThingId thingId, + final JsonFieldSelector jsonFieldSelector, + final DittoHeaders dittoHeaders, + final DittoHeaders signalHeaders, + final List> thingEvents + ) { + final JsonArray configuredPredefinedExtraFields = + JsonArray.of(signalHeaders.get(DittoHeaderDefinition.PRE_DEFINED_EXTRA_FIELDS.getKey())); + final Set allConfiguredPredefinedExtraFields = configuredPredefinedExtraFields.stream() + .filter(JsonValue::isString) + .map(JsonValue::asString) + .map(JsonPointer::of) + .collect(Collectors.toSet()); + + final JsonObject preDefinedExtraFields = + JsonObject.of(signalHeaders.get(DittoHeaderDefinition.PRE_DEFINED_EXTRA_FIELDS_OBJECT.getKey())); + final CompletionStage filteredPreDefinedExtraFieldsReadGranted = + filterPreDefinedExtraReadGrantedObject(jsonFieldSelector, dittoHeaders, signalHeaders, + preDefinedExtraFields); + + final boolean allExtraFieldsPresent = + allConfiguredPredefinedExtraFields.containsAll(jsonFieldSelector.getPointers()); + if (allExtraFieldsPresent) { + return filteredPreDefinedExtraFieldsReadGranted; + } else { + // optimization to only fetch extra fields which were not pre-defined + final List missingFieldsPointers = new ArrayList<>(jsonFieldSelector.getPointers()); + missingFieldsPointers.removeAll(allConfiguredPredefinedExtraFields); + final JsonFieldSelector missingFieldsSelector = JsonFactory.newFieldSelector(missingFieldsPointers); + final var cachingParameters = + new CachingParameters(missingFieldsSelector, thingEvents, true, 0); + + return doRetrievePartialThing(thingId, dittoHeaders, null, cachingParameters) + .thenCompose(jsonObject -> filteredPreDefinedExtraFieldsReadGranted + .thenApply(preDefinedObject -> + JsonFactory.newObject( // merge + applyJsonFieldSelector(jsonObject, missingFieldsSelector), + preDefinedObject + ) + ) + ); + } + } + + private static CompletionStage filterPreDefinedExtraReadGrantedObject( + final JsonFieldSelector jsonFieldSelector, + final DittoHeaders dittoHeaders, final DittoHeaders signalHeaders, final JsonObject preDefinedExtraFields) { + final JsonObject preDefinedExtraFieldsReadGrant = JsonObject.of( + signalHeaders.get(DittoHeaderDefinition.PRE_DEFINED_EXTRA_FIELDS_READ_GRANT_OBJECT.getKey()) + ); + final JsonFieldSelector grantedReadJsonFieldSelector = filterAskedForFieldSelectorToGrantedFields( + jsonFieldSelector, preDefinedExtraFieldsReadGrant, + dittoHeaders.getAuthorizationContext().getAuthorizationSubjectIds() + ); + return CompletableFuture.completedStage(preDefinedExtraFields.get(grantedReadJsonFieldSelector)); + } + + private static JsonFieldSelector filterAskedForFieldSelectorToGrantedFields( + final JsonFieldSelector jsonFieldSelector, + final JsonObject preDefinedExtraFieldsReadGrant, + final List authorizationSubjectIds) + { + final List allowedPointers = StreamSupport.stream(jsonFieldSelector.spliterator(), false) + .filter(pointer -> preDefinedExtraFieldsReadGrant.getValue(JsonKey.of(pointer.toString())) + .filter(JsonValue::isArray) + .map(JsonValue::asArray) + .filter(readGrantArray -> readGrantArray.stream() + .filter(JsonValue::isString) + .map(JsonValue::asString) + .anyMatch(authorizationSubjectIds::contains) + ).isPresent() + ).toList(); + return JsonFactory.newFieldSelector(allowedPointers); + } + protected CompletionStage doRetrievePartialThing(final EntityId thingId, final DittoHeaders dittoHeaders, @Nullable final DittoHeaders dittoHeadersNotAddedToCacheKey, @@ -369,11 +459,11 @@ private CompletionStage doSmartUpdateCachedObject(final SignalEnrich } private static T getLast(final List list) { - return list.get(list.size() - 1); + return list.getLast(); } private static T getFirst(final List list) { - return list.get(0); + return list.getFirst(); } private CompletionStage handleNextExpectedThingEvents(final SignalEnrichmentCacheKey cacheKey, diff --git a/internal/models/signalenrichment/src/test/java/org/eclipse/ditto/internal/models/signalenrichment/AbstractCachingSignalEnrichmentFacadeTest.java b/internal/models/signalenrichment/src/test/java/org/eclipse/ditto/internal/models/signalenrichment/AbstractCachingSignalEnrichmentFacadeTest.java index 2d51c3cc96..f62a080dcc 100644 --- a/internal/models/signalenrichment/src/test/java/org/eclipse/ditto/internal/models/signalenrichment/AbstractCachingSignalEnrichmentFacadeTest.java +++ b/internal/models/signalenrichment/src/test/java/org/eclipse/ditto/internal/models/signalenrichment/AbstractCachingSignalEnrichmentFacadeTest.java @@ -50,7 +50,7 @@ */ abstract class AbstractCachingSignalEnrichmentFacadeTest extends AbstractSignalEnrichmentFacadeTest { - private static final String ISSUER_PREFIX = "test:"; + protected static final String ISSUER_PREFIX = "test:"; private static final String CACHE_CONFIG_KEY = "my-cache"; private static final String CACHE_CONFIG = CACHE_CONFIG_KEY + """ { diff --git a/internal/models/signalenrichment/src/test/java/org/eclipse/ditto/internal/models/signalenrichment/DittoCachingSignalEnrichmentFacadeTest.java b/internal/models/signalenrichment/src/test/java/org/eclipse/ditto/internal/models/signalenrichment/DittoCachingSignalEnrichmentFacadeTest.java index 8b54abe56e..9099db2324 100644 --- a/internal/models/signalenrichment/src/test/java/org/eclipse/ditto/internal/models/signalenrichment/DittoCachingSignalEnrichmentFacadeTest.java +++ b/internal/models/signalenrichment/src/test/java/org/eclipse/ditto/internal/models/signalenrichment/DittoCachingSignalEnrichmentFacadeTest.java @@ -12,9 +12,28 @@ */ package org.eclipse.ditto.internal.models.signalenrichment; +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.CompletionStage; + import org.apache.pekko.testkit.javadsl.TestKit; +import org.eclipse.ditto.base.model.auth.AuthorizationContext; +import org.eclipse.ditto.base.model.auth.AuthorizationSubject; +import org.eclipse.ditto.base.model.auth.DittoAuthorizationContextType; +import org.eclipse.ditto.base.model.entity.metadata.MetadataModelFactory; +import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition; +import org.eclipse.ditto.base.model.headers.DittoHeaders; +import org.eclipse.ditto.base.model.signals.DittoTestSystem; import org.eclipse.ditto.internal.utils.cache.config.CacheConfig; +import org.eclipse.ditto.json.JsonFieldSelector; import org.eclipse.ditto.json.JsonObject; +import org.eclipse.ditto.json.JsonPointer; +import org.eclipse.ditto.json.JsonValue; +import org.eclipse.ditto.things.model.ThingId; +import org.eclipse.ditto.things.model.signals.commands.query.RetrieveThing; +import org.eclipse.ditto.things.model.signals.commands.query.RetrieveThingResponse; +import org.eclipse.ditto.things.model.signals.events.AttributeModified; +import org.junit.Test; /** * Unit tests for {@link DittoCachingSignalEnrichmentFacade}. @@ -27,7 +46,34 @@ public final class DittoCachingSignalEnrichmentFacadeTest extends AbstractCachin "attributes": {"x": 5}, "features": {"y": {"properties": {"z": true}}}, "_metadata": {"attributes": {"x": {"type": "x attribute"}}} - }"""); + }""" + ); + + private static final JsonObject EXPECTED_THING_JSON_PRE_DEFINED_EXTRA = JsonObject.of(""" + { + "definition": "some:cool:definition", + "attributes": {"x": 5, "pre": {"bar": [1,2,3]}, "pre2": {"some": 41, "secret": true}} + }""" + ); + + private static final AttributeModified THING_EVENT_PRE_DEFINED_EXTRA_FIELDS = AttributeModified.of( + ThingId.generateRandom("org.eclipse.test"), + JsonPointer.of("x"), + JsonValue.of(42), + 4L, + Instant.EPOCH, + DittoHeaders.newBuilder() + .putHeader(DittoHeaderDefinition.PRE_DEFINED_EXTRA_FIELDS.getKey(), + "[\"/definition\",\"/attributes/pre\",\"/attributes/pre2\"]") + .putHeader(DittoHeaderDefinition.PRE_DEFINED_EXTRA_FIELDS_READ_GRANT_OBJECT.getKey(), + "{\"/definition\":[\"test:user\"],\"/attributes/pre\":[\"test:user\"]}") + .putHeader(DittoHeaderDefinition.PRE_DEFINED_EXTRA_FIELDS_OBJECT.getKey(), + "{\"definition\":\"some:cool:definition\",\"attributes\":{\"pre\":{\"bar\": [1,2,3]}}}") + .build(), + MetadataModelFactory.newMetadataBuilder() + .set("type", "x attribute") + .build()); + @Override protected CachingSignalEnrichmentFacade createCachingSignalEnrichmentFacade(final TestKit kit, @@ -44,5 +90,78 @@ protected JsonObject getExpectedThingJson() { return EXPECTED_THING_JSON; } + @Test + public void enrichedEventWithPreDefinedExtraFieldsDoesNotLeadToCacheLookup() { + DittoTestSystem.run(this, kit -> { + final SignalEnrichmentFacade underTest = + createSignalEnrichmentFacadeUnderTest(kit, Duration.ofSeconds(10L)); + final ThingId thingId = ThingId.generateRandom(); + final String userId = ISSUER_PREFIX + "user"; + final DittoHeaders headers = DittoHeaders.newBuilder() + .authorizationContext(AuthorizationContext.newInstance(DittoAuthorizationContextType.UNSPECIFIED, + AuthorizationSubject.newInstance(userId))) + .randomCorrelationId() + .build(); + final JsonFieldSelector fieldSelector = + JsonFieldSelector.newInstance("definition", "attributes/pre", "attributes/pre2"); + final CompletionStage askResult = underTest.retrievePartialThing(thingId, fieldSelector, + headers, THING_EVENT_PRE_DEFINED_EXTRA_FIELDS); + + // THEN: no cache lookup should be done + kit.expectNoMessage(Duration.ofSeconds(1)); + askResult.toCompletableFuture().join(); + // AND: the resulting thing JSON includes the with the updated value: + final JsonObject expectedThingJson = EXPECTED_THING_JSON_PRE_DEFINED_EXTRA.toBuilder() + .remove("attributes/x") // x was not asked for in extra fields + .remove("attributes/pre2") // we don't have the read grant for this field + .build(); + softly.assertThat(askResult).isCompletedWithValue(expectedThingJson); + }); + } + + @Test + public void enrichedEventWithPreDefinedExtraFieldsAndAdditionalRequestedOnesLeadToPartialCacheLookup() { + DittoTestSystem.run(this, kit -> { + final SignalEnrichmentFacade underTest = + createSignalEnrichmentFacadeUnderTest(kit, Duration.ofSeconds(10L)); + final ThingId thingId = ThingId.generateRandom(); + final String userId = ISSUER_PREFIX + "user"; + final DittoHeaders headers = DittoHeaders.newBuilder() + .authorizationContext(AuthorizationContext.newInstance(DittoAuthorizationContextType.UNSPECIFIED, + AuthorizationSubject.newInstance(userId))) + .randomCorrelationId() + .build(); + final JsonFieldSelector fieldSelector = + JsonFieldSelector.newInstance("definition", "attributes/x", "attributes/unchanged", + "attributes/pre", "attributes/pre2"); + final CompletionStage askResult = underTest.retrievePartialThing(thingId, fieldSelector, + headers, THING_EVENT_PRE_DEFINED_EXTRA_FIELDS); + + final JsonFieldSelector askedForFieldSelector = + JsonFieldSelector.newInstance("attributes/x", "attributes/unchanged"); + // WHEN: Command handler receives expected RetrieveThing and responds with RetrieveThingResponse + final RetrieveThing retrieveThing = kit.expectMsgClass(RetrieveThing.class); + softly.assertThat(retrieveThing.getDittoHeaders().getAuthorizationContext().getAuthorizationSubjectIds()) + .contains(userId); + softly.assertThat(retrieveThing.getSelectedFields()).contains(actualSelectedFields(askedForFieldSelector)); + // WHEN: response is handled so that it is also added to the cache + final JsonObject retrievedExtraThing = JsonObject.of(""" + { + "_revision": 3, + "attributes": {"x": 42, "unchanged": "foo"} + } + """); + kit.reply(RetrieveThingResponse.of(thingId, retrievedExtraThing, headers)); + askResult.toCompletableFuture().join(); + + // AND: the resulting thing JSON includes the with the updated value: + final JsonObject expectedThingJson = EXPECTED_THING_JSON_PRE_DEFINED_EXTRA.toBuilder() + .remove("attributes/pre2") // we don't have the read grant for this field + .set(JsonPointer.of("attributes/x"), 42) // we expect the updated value (as part of the modify event) + .set(JsonPointer.of("attributes/unchanged"), "foo") // we expect the updated value (retrieved via cache) + .build(); + softly.assertThat(askResult).isCompletedWithValue(expectedThingJson); + }); + } } diff --git a/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/config/EventConfig.java b/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/config/EventConfig.java index 436fdcc228..27779603be 100644 --- a/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/config/EventConfig.java +++ b/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/config/EventConfig.java @@ -36,7 +36,7 @@ public interface EventConfig { /** * An enumeration of the known config path expressions and their associated default values for - * {@code SnapshotConfig}. + * {@code EventConfig}. */ enum EventConfigValue implements KnownConfigValue { @@ -65,7 +65,6 @@ public Object getDefaultValue() { public String getConfigPath() { return path; } - - } + } } diff --git a/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/AbstractPersistenceActor.java b/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/AbstractPersistenceActor.java index 3053b4156b..b03b6732a7 100755 --- a/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/AbstractPersistenceActor.java +++ b/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/AbstractPersistenceActor.java @@ -585,8 +585,6 @@ private record PersistEventAsync< E extends EventsourcedEvent, S extends Jsonifiable.WithFieldSelectorAndPredicate>(E event, BiConsumer handler) {} - ; - /** * Persist an event, modify actor state by the event strategy, then invoke the handler. * diff --git a/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/AbstractPersistenceSupervisor.java b/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/AbstractPersistenceSupervisor.java index e7c1b917aa..96374782d4 100644 --- a/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/AbstractPersistenceSupervisor.java +++ b/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/AbstractPersistenceSupervisor.java @@ -1069,7 +1069,7 @@ protected CompletionStage filterTargetActorResponseViaEnforcer( } } - private void replyUnavailableException(final Object message, final ActorRef sender) { + protected void replyUnavailableException(final Object message, final ActorRef sender) { log.withCorrelationId(message instanceof WithDittoHeaders withDittoHeaders ? withDittoHeaders : null) .warning("Received message during downtime of child actor for Entity with ID <{}>: <{}>", entityId, message); diff --git a/policies/enforcement/src/main/java/org/eclipse/ditto/policies/enforcement/AbstractEnforcerActor.java b/policies/enforcement/src/main/java/org/eclipse/ditto/policies/enforcement/AbstractEnforcerActor.java index 323a4bc603..30cacf9582 100644 --- a/policies/enforcement/src/main/java/org/eclipse/ditto/policies/enforcement/AbstractEnforcerActor.java +++ b/policies/enforcement/src/main/java/org/eclipse/ditto/policies/enforcement/AbstractEnforcerActor.java @@ -162,6 +162,7 @@ private void doEnforceSignal(final S signal, final ActorRef sender) { ); }) .thenCompose(this::performWotBasedSignalValidation) + .thenCompose(this::enrichWithPreDefinedExtraFields) .whenComplete((authorizedSignal, throwable) -> { if (null != authorizedSignal) { startedSpan.mark("enforce_success").finish(); @@ -196,13 +197,22 @@ private void doEnforceSignal(final S signal, final ActorRef sender) { /** * Performs an optional WoT based validation of the already {@code authorizedSignal}. * - * @param authorizedSignal the signal to validate against a WoT model. + * @param signal the signal to validate against a WoT model. * @return a CompletionStage finished successfully with the {@code authorizedSignal} when WoT validation was * either not applied or passed successfully. In case of a WoT validation error, exceptionally finished with * a WoT validation exception. */ - protected CompletionStage performWotBasedSignalValidation(final S authorizedSignal) { - return CompletableFuture.completedStage(authorizedSignal); + protected CompletionStage performWotBasedSignalValidation(final S signal) { + return CompletableFuture.completedStage(signal); + } + + /** + * TODO TJ doc + * @param signal + * @return + */ + protected CompletionStage enrichWithPreDefinedExtraFields(final S signal) { + return CompletableFuture.completedStage(signal); } /** diff --git a/policies/enforcement/src/main/java/org/eclipse/ditto/policies/enforcement/config/EntityCreationConfig.java b/policies/enforcement/src/main/java/org/eclipse/ditto/policies/enforcement/config/EntityCreationConfig.java index 5a3e7f4caf..834382fb09 100644 --- a/policies/enforcement/src/main/java/org/eclipse/ditto/policies/enforcement/config/EntityCreationConfig.java +++ b/policies/enforcement/src/main/java/org/eclipse/ditto/policies/enforcement/config/EntityCreationConfig.java @@ -21,7 +21,7 @@ import org.eclipse.ditto.internal.utils.config.KnownConfigValue; /** - * Provides configuration settings for Concierge entity creation behaviour. + * Provides configuration settings for entity creation behaviour. */ @Immutable public interface EntityCreationConfig { diff --git a/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/DefaultPreDefinedExtraFieldsConfig.java b/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/DefaultPreDefinedExtraFieldsConfig.java new file mode 100644 index 0000000000..0bd643e908 --- /dev/null +++ b/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/DefaultPreDefinedExtraFieldsConfig.java @@ -0,0 +1,109 @@ +/* + * Copyright (c) 2025 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.ditto.things.service.common.config; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.regex.Pattern; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.Immutable; + +import org.eclipse.ditto.base.model.common.LikeHelper; +import org.eclipse.ditto.internal.utils.config.ConfigWithFallback; +import org.eclipse.ditto.json.JsonFieldSelector; + +import com.typesafe.config.Config; + +/** + * This class implements {@link PreDefinedExtraFieldsConfig}. + */ +@Immutable +public final class DefaultPreDefinedExtraFieldsConfig implements PreDefinedExtraFieldsConfig { + + private final List namespacePatterns; + @Nullable private final String rqlCondition; + private final JsonFieldSelector extraFields; + + private DefaultPreDefinedExtraFieldsConfig(final ConfigWithFallback config) { + this.namespacePatterns = compile(List.copyOf(config.getStringList( + PreDefinedExtraFieldsConfig.ConfigValues.NAMESPACES.getConfigPath()) + )); + this.rqlCondition = config.getStringOrNull(ConfigValues.CONDITION); + final List configuredExtraFields = config.getStringList(ConfigValues.EXTRA_FIELDS.getConfigPath()); + this.extraFields = JsonFieldSelector.newInstance( + configuredExtraFields.getFirst(), + configuredExtraFields.subList(1, configuredExtraFields.size()).toArray(CharSequence[]::new) + ); + } + + /** + * Returns an instance of {@code CreationRestrictionConfig} based on the settings of the specified Config. + * + * @param config is supposed to provide the settings of the restriction config. + * @return the instance. + * @throws org.eclipse.ditto.internal.utils.config.DittoConfigError if {@code config} is invalid. + */ + public static DefaultPreDefinedExtraFieldsConfig of(final Config config) { + return new DefaultPreDefinedExtraFieldsConfig(ConfigWithFallback.newInstance(config, + PreDefinedExtraFieldsConfig.ConfigValues.values())); + } + + private static List compile(final List patterns) { + return patterns.stream() + .map(LikeHelper::convertToRegexSyntax) + .filter(Objects::nonNull) + .map(Pattern::compile) + .toList(); + } + + @Override + public List getNamespace() { + return namespacePatterns; + } + + @Override + public Optional getCondition() { + return Optional.ofNullable(rqlCondition); + } + + @Override + public JsonFieldSelector getExtraFields() { + return extraFields; + } + + @Override + public boolean equals(final Object o) { + if (!(o instanceof final DefaultPreDefinedExtraFieldsConfig that)) { + return false; + } + return Objects.equals(namespacePatterns, that.namespacePatterns) && + Objects.equals(rqlCondition, that.rqlCondition) && + Objects.equals(extraFields, that.extraFields); + } + + @Override + public int hashCode() { + return Objects.hash(namespacePatterns, rqlCondition, extraFields); + } + + @Override + public String toString() { + return getClass().getSimpleName() + "[" + + "namespacePatterns=" + namespacePatterns + + ", rqlCondition='" + rqlCondition + '\'' + + ", extraFields=" + extraFields + + "]"; + } +} diff --git a/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/DefaultThingConfig.java b/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/DefaultThingConfig.java index 649e08d1ca..25c49bed14 100644 --- a/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/DefaultThingConfig.java +++ b/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/DefaultThingConfig.java @@ -23,9 +23,7 @@ import org.eclipse.ditto.internal.utils.config.ScopedConfig; import org.eclipse.ditto.internal.utils.persistence.mongo.config.ActivityCheckConfig; import org.eclipse.ditto.internal.utils.persistence.mongo.config.DefaultActivityCheckConfig; -import org.eclipse.ditto.internal.utils.persistence.mongo.config.DefaultEventConfig; import org.eclipse.ditto.internal.utils.persistence.mongo.config.DefaultSnapshotConfig; -import org.eclipse.ditto.internal.utils.persistence.mongo.config.EventConfig; import org.eclipse.ditto.internal.utils.persistence.mongo.config.SnapshotConfig; import org.eclipse.ditto.internal.utils.persistentactors.cleanup.CleanupConfig; @@ -43,7 +41,8 @@ public final class DefaultThingConfig implements ThingConfig { private final SupervisorConfig supervisorConfig; private final ActivityCheckConfig activityCheckConfig; private final SnapshotConfig snapshotConfig; - private final EventConfig eventConfig; + private final ThingEventConfig eventConfig; + private final ThingMessageConfig messageConfig; private final CleanupConfig cleanupConfig; private DefaultThingConfig(final ScopedConfig scopedConfig) { @@ -51,7 +50,8 @@ private DefaultThingConfig(final ScopedConfig scopedConfig) { supervisorConfig = DefaultSupervisorConfig.of(scopedConfig); activityCheckConfig = DefaultActivityCheckConfig.of(scopedConfig); snapshotConfig = DefaultSnapshotConfig.of(scopedConfig); - eventConfig = DefaultEventConfig.of(scopedConfig); + eventConfig = DefaultThingEventConfig.of(scopedConfig); + messageConfig = DefaultThingMessageConfig.of(scopedConfig); cleanupConfig = CleanupConfig.of(scopedConfig); } @@ -87,10 +87,15 @@ public CleanupConfig getCleanupConfig() { } @Override - public EventConfig getEventConfig() { + public ThingEventConfig getEventConfig() { return eventConfig; } + @Override + public ThingMessageConfig getMessageConfig() { + return messageConfig; + } + @Override public Duration getShutdownTimeout() { return shutdownTimeout; @@ -115,8 +120,8 @@ public boolean equals(final Object o) { @Override public int hashCode() { - return Objects.hash(supervisorConfig, activityCheckConfig, snapshotConfig, eventConfig, cleanupConfig, - shutdownTimeout); + return Objects.hash(supervisorConfig, activityCheckConfig, snapshotConfig, eventConfig, messageConfig, + cleanupConfig, shutdownTimeout); } @Override @@ -126,6 +131,7 @@ public String toString() { ", activityCheckConfig=" + activityCheckConfig + ", snapshotConfig=" + snapshotConfig + ", eventConfig=" + eventConfig + + ", messageConfig=" + messageConfig + ", cleanupConfig=" + cleanupConfig + ", shutdownTimeout=" + shutdownTimeout + "]"; diff --git a/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/DefaultThingEventConfig.java b/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/DefaultThingEventConfig.java new file mode 100644 index 0000000000..144349af68 --- /dev/null +++ b/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/DefaultThingEventConfig.java @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2025 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.ditto.things.service.common.config; + +import java.util.List; +import java.util.Objects; + +import org.eclipse.ditto.internal.utils.config.ConfigWithFallback; +import org.eclipse.ditto.internal.utils.config.ScopedConfig; +import org.eclipse.ditto.internal.utils.persistence.mongo.config.DefaultEventConfig; + +import com.typesafe.config.Config; + +/** + * Default implementation of {@code ThingEventConfig}. + */ +public final class DefaultThingEventConfig implements ThingEventConfig { + + private static final String CONFIG_PATH = "event"; + + private final DefaultEventConfig defaultEventConfigDelegated; + private final List preDefinedExtraFieldsConfigs; + + private DefaultThingEventConfig(final DefaultEventConfig delegate, final ScopedConfig config) { + this.defaultEventConfigDelegated = delegate; + preDefinedExtraFieldsConfigs = + config.getObjectList(ThingEventConfigValue.PRE_DEFINED_EXTRA_FIELDS.getConfigPath()) + .stream() + .map(configObj -> DefaultPreDefinedExtraFieldsConfig.of(configObj.toConfig())) + .map(PreDefinedExtraFieldsConfig.class::cast) + .toList(); + } + + /** + * Returns an instance of the default event journal config based on the settings of the specified Config. + * + * @param config is supposed to provide the settings of the event journal config at {@value #CONFIG_PATH}. + * @return instance + * @throws org.eclipse.ditto.internal.utils.config.DittoConfigError if {@code config} is invalid. + */ + public static DefaultThingEventConfig of(final Config config) { + return new DefaultThingEventConfig(DefaultEventConfig.of(config), + ConfigWithFallback.newInstance(config, CONFIG_PATH, ThingEventConfigValue.values())); + } + + @Override + public List getHistoricalHeadersToPersist() { + return defaultEventConfigDelegated.getHistoricalHeadersToPersist(); + } + + @Override + public List getPredefinedExtraFieldsConfigs() { + return preDefinedExtraFieldsConfigs; + } + + @Override + public boolean equals(final Object o) { + if (!(o instanceof final DefaultThingEventConfig that)) { + return false; + } + return Objects.equals(defaultEventConfigDelegated, that.defaultEventConfigDelegated) && + Objects.equals(preDefinedExtraFieldsConfigs, that.preDefinedExtraFieldsConfigs); + } + + @Override + public int hashCode() { + return Objects.hash(defaultEventConfigDelegated, preDefinedExtraFieldsConfigs); + } + + @Override + public String toString() { + return getClass().getSimpleName() + "[" + + "defaultEventConfigDelegated=" + defaultEventConfigDelegated + + ", preDefinedExtraFieldsConfigs=" + preDefinedExtraFieldsConfigs + + "]"; + } +} diff --git a/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/DefaultThingMessageConfig.java b/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/DefaultThingMessageConfig.java new file mode 100644 index 0000000000..d87e10c9f4 --- /dev/null +++ b/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/DefaultThingMessageConfig.java @@ -0,0 +1,78 @@ +/* + * Copyright (c) 2025 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.ditto.things.service.common.config; + +import java.util.List; +import java.util.Objects; + +import org.eclipse.ditto.internal.utils.config.ConfigWithFallback; +import org.eclipse.ditto.internal.utils.config.ScopedConfig; + +import com.typesafe.config.Config; + +/** + * Default implementation of {@code ThingMessageConfig}. + */ +public final class DefaultThingMessageConfig implements ThingMessageConfig { + + private static final String CONFIG_PATH = "message"; + + private final List preDefinedExtraFieldsConfigs; + + private DefaultThingMessageConfig(final ScopedConfig config) { + preDefinedExtraFieldsConfigs = + config.getObjectList(ThingMessageConfigValue.PRE_DEFINED_EXTRA_FIELDS.getConfigPath()) + .stream() + .map(configObj -> DefaultPreDefinedExtraFieldsConfig.of(configObj.toConfig())) + .map(PreDefinedExtraFieldsConfig.class::cast) + .toList(); + } + + /** + * Returns an instance of the default event journal config based on the settings of the specified Config. + * + * @param config is supposed to provide the settings of the event journal config at {@value #CONFIG_PATH}. + * @return instance + * @throws org.eclipse.ditto.internal.utils.config.DittoConfigError if {@code config} is invalid. + */ + public static DefaultThingMessageConfig of(final Config config) { + return new DefaultThingMessageConfig( + ConfigWithFallback.newInstance(config, CONFIG_PATH, ThingMessageConfigValue.values()) + ); + } + + @Override + public List getPredefinedExtraFieldsConfigs() { + return preDefinedExtraFieldsConfigs; + } + + @Override + public boolean equals(final Object o) { + if (!(o instanceof final DefaultThingMessageConfig that)) { + return false; + } + return Objects.equals(preDefinedExtraFieldsConfigs, that.preDefinedExtraFieldsConfigs); + } + + @Override + public int hashCode() { + return Objects.hash(preDefinedExtraFieldsConfigs); + } + + @Override + public String toString() { + return getClass().getSimpleName() + "[" + + "preDefinedExtraFieldsConfigs=" + preDefinedExtraFieldsConfigs + + "]"; + } +} diff --git a/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/PreDefinedExtraFieldsConfig.java b/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/PreDefinedExtraFieldsConfig.java new file mode 100644 index 0000000000..cca03b223b --- /dev/null +++ b/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/PreDefinedExtraFieldsConfig.java @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2025 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.ditto.things.service.common.config; + +import java.util.List; +import java.util.Optional; +import java.util.regex.Pattern; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.Immutable; + +import org.eclipse.ditto.internal.utils.config.KnownConfigValue; +import org.eclipse.ditto.json.JsonFieldSelector; + +/** + * Provides a configuration entry for Thing event pre-defined {@code extraFields} injection. + */ +@Immutable +public interface PreDefinedExtraFieldsConfig { + + /** + * The list of namespace {@link Pattern}s this entry applies to. + * An empty list would match any. The pattern must match the full string. + * + * @return the list of values + */ + List getNamespace(); + + /** + * The optional RQL condition which - when evaluating to {@code true} - will apply sending the {@code extraFields}. + * Extra fields will not be injected when the condition evaluates to {@code false}. + * + * @return the optional RQL condition under which circumstances to inject extra fields. + */ + Optional getCondition(); + + /** + * The extra fields in form of {@link JsonFieldSelector} to send along all events in the matching namespaces + * whenever the optional condition matches. + * + * @return the extra fields to send along for thing events. + */ + JsonFieldSelector getExtraFields(); + + /** + * An enumeration of the known config path expressions and their associated default values for + * {@code PreDefinedExtraFieldsConfig}. + */ + enum ConfigValues implements KnownConfigValue { + /** + * Matching namespaces, supports wildcards. + */ + NAMESPACES("namespaces", List.of()), + + /** + * Optional RQL condition. + */ + CONDITION("condition", null), + + /** + * Matching auth subjects. + */ + EXTRA_FIELDS("extra-fields", List.of()); + + private final String path; + private final Object defaultValue; + + ConfigValues(final String thePath, @Nullable final Object theDefaultValue) { + path = thePath; + defaultValue = theDefaultValue; + } + + @Override + public String getConfigPath() { + return path; + } + + @Override + public Object getDefaultValue() { + return defaultValue; + } + + } +} diff --git a/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/ThingConfig.java b/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/ThingConfig.java index d9dc993651..ed5351b80e 100644 --- a/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/ThingConfig.java +++ b/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/ThingConfig.java @@ -18,7 +18,6 @@ import org.eclipse.ditto.base.service.config.supervision.WithSupervisorConfig; import org.eclipse.ditto.internal.utils.config.KnownConfigValue; -import org.eclipse.ditto.internal.utils.persistence.mongo.config.EventConfig; import org.eclipse.ditto.internal.utils.persistence.mongo.config.WithActivityCheckConfig; import org.eclipse.ditto.internal.utils.persistence.mongo.config.WithSnapshotConfig; import org.eclipse.ditto.internal.utils.persistentactors.cleanup.WithCleanupConfig; @@ -35,7 +34,14 @@ public interface ThingConfig extends WithSupervisorConfig, WithActivityCheckConf * * @return the config. */ - EventConfig getEventConfig(); + ThingEventConfig getEventConfig(); + + /** + * Returns the config regarding thing messages. + * + * @return the config. + */ + ThingMessageConfig getMessageConfig(); /** * Get the timeout waiting for responses and acknowledgements during coordinated shutdown. diff --git a/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/ThingEventConfig.java b/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/ThingEventConfig.java new file mode 100644 index 0000000000..ba24bd5e57 --- /dev/null +++ b/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/ThingEventConfig.java @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2025 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.ditto.things.service.common.config; + +import java.util.List; + +import org.eclipse.ditto.internal.utils.config.KnownConfigValue; +import org.eclipse.ditto.internal.utils.persistence.mongo.config.EventConfig; + +/** + * Extends {@link EventConfig} by providing ThingEvent specific additional configuration. + */ +public interface ThingEventConfig extends EventConfig { + + /** + * Contains pre-defined (configured) {@code extraFields} to send along all thing (change) events. + * + * @return the pre-defined {@code extraFields} to send along. + */ + List getPredefinedExtraFieldsConfigs(); + + /** + * An enumeration of the known config path expressions and their associated default values for + * {@code ThingEventConfig}. + */ + enum ThingEventConfigValue implements KnownConfigValue { + + /** + * The pre-defined (configured) {@code extraFields} to send along all events. + */ + PRE_DEFINED_EXTRA_FIELDS("pre-defined-extra-fields", List.of()); + + private final String path; + private final Object defaultValue; + + ThingEventConfigValue(final String thePath, final Object theDefaultValue) { + path = thePath; + defaultValue = theDefaultValue; + } + + @Override + public Object getDefaultValue() { + return defaultValue; + } + + @Override + public String getConfigPath() { + return path; + } + } +} diff --git a/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/ThingMessageConfig.java b/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/ThingMessageConfig.java new file mode 100644 index 0000000000..eab214354f --- /dev/null +++ b/things/service/src/main/java/org/eclipse/ditto/things/service/common/config/ThingMessageConfig.java @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2025 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.ditto.things.service.common.config; + +import java.util.List; + +import org.eclipse.ditto.internal.utils.config.KnownConfigValue; + +/** + * Provides "thing message" specific configuration. + */ +public interface ThingMessageConfig { + + /** + * Contains pre-defined (configured) {@code extraFields} to send along all thing (change) messages. + * + * @return the pre-defined {@code extraFields} to send along. + */ + List getPredefinedExtraFieldsConfigs(); + + /** + * An enumeration of the known config path expressions and their associated default values for + * {@code ThingMessageConfig}. + */ + enum ThingMessageConfigValue implements KnownConfigValue { + + /** + * The pre-defined (configured) {@code extraFields} to send along all messages. + */ + PRE_DEFINED_EXTRA_FIELDS("pre-defined-extra-fields", List.of()); + + private final String path; + private final Object defaultValue; + + ThingMessageConfigValue(final String thePath, final Object theDefaultValue) { + path = thePath; + defaultValue = theDefaultValue; + } + + @Override + public Object getDefaultValue() { + return defaultValue; + } + + @Override + public String getConfigPath() { + return path; + } + } +} diff --git a/things/service/src/main/java/org/eclipse/ditto/things/service/enforcement/LiveSignalEnforcement.java b/things/service/src/main/java/org/eclipse/ditto/things/service/enforcement/LiveSignalEnforcement.java index 0421a6169c..bc31aa9cdf 100644 --- a/things/service/src/main/java/org/eclipse/ditto/things/service/enforcement/LiveSignalEnforcement.java +++ b/things/service/src/main/java/org/eclipse/ditto/things/service/enforcement/LiveSignalEnforcement.java @@ -219,9 +219,7 @@ private Signal enforceLiveEvent(final Signal liveSignal, final Enforcer en } } - private Signal enforceMessageCommand(final MessageCommand command, - final Enforcer enforcer) { - + private Signal enforceMessageCommand(final MessageCommand command, final Enforcer enforcer) { if (isAuthorized(command, enforcer)) { return publishMessageCommand(command, enforcer); } else { @@ -229,9 +227,7 @@ private Signal enforceMessageCommand(final MessageCommand command, } } - private Signal publishMessageCommand(final MessageCommand command, - final Enforcer enforcer) { - + private Signal publishMessageCommand(final MessageCommand command, final Enforcer enforcer) { final ResourceKey resourceKey = ResourceKey.newInstance(MessageCommand.RESOURCE_TYPE, command.getResourcePath()); final var effectedSubjects = enforcer.getSubjectsWithPermission(resourceKey, Permission.READ); diff --git a/things/service/src/main/java/org/eclipse/ditto/things/service/enforcement/ThingEnforcerActor.java b/things/service/src/main/java/org/eclipse/ditto/things/service/enforcement/ThingEnforcerActor.java index 33f8db3394..5de744c022 100644 --- a/things/service/src/main/java/org/eclipse/ditto/things/service/enforcement/ThingEnforcerActor.java +++ b/things/service/src/main/java/org/eclipse/ditto/things/service/enforcement/ThingEnforcerActor.java @@ -91,6 +91,8 @@ import org.eclipse.ditto.things.model.signals.commands.exceptions.ThingNotModifiableException; import org.eclipse.ditto.things.model.signals.commands.modify.CreateThing; import org.eclipse.ditto.things.model.signals.commands.modify.ThingModifyCommand; +import org.eclipse.ditto.things.service.persistence.actors.enrichment.EnrichSignalWithPreDefinedExtraFields; +import org.eclipse.ditto.things.service.persistence.actors.enrichment.EnrichSignalWithPreDefinedExtraFieldsResponse; import org.eclipse.ditto.wot.api.validator.WotThingModelValidator; import org.eclipse.ditto.wot.integration.DittoWotIntegration; @@ -189,9 +191,9 @@ private CompletionStage getDreForMissingPolicyEnforcer(fi } @Override - protected CompletionStage> performWotBasedSignalValidation(final Signal authorizedSignal + protected CompletionStage> performWotBasedSignalValidation(final Signal signal ) { - if (authorizedSignal instanceof MessageCommand messageCommand) { + if (signal instanceof MessageCommand messageCommand) { final var startedSpan = DittoTracing.newPreparedSpan( messageCommand.getDittoHeaders(), SpanOperationName.of("enforce_wot_model message") @@ -208,14 +210,40 @@ protected CompletionStage> performWotBasedSignalValidation(final Signa startedSpan.finish(); }) .thenApply(Function.identity()); - } else if (authorizedSignal instanceof MessageCommandResponse messageCommandResponse) { + } else if (signal instanceof MessageCommandResponse messageCommandResponse) { return doPerformWotBasedMessageCommandResponseValidation(messageCommandResponse) .thenApply(Function.identity()); } else { - return super.performWotBasedSignalValidation(authorizedSignal); + return super.performWotBasedSignalValidation(signal); } } + @Override + protected CompletionStage> enrichWithPreDefinedExtraFields(final Signal signal) { + if (signal instanceof MessageCommand messageCommand) { + return enrichSignalWithPredefinedFieldsAtPersistenceActor(messageCommand) + .thenApply(opt -> opt.orElse(signal)); + } else { + return super.enrichWithPreDefinedExtraFields(signal); + } + } + + private CompletionStage>> enrichSignalWithPredefinedFieldsAtPersistenceActor( + final Signal signal + ) { + return Patterns.ask(getContext().getParent(), + new EnrichSignalWithPreDefinedExtraFields(signal), DEFAULT_LOCAL_ASK_TIMEOUT + ).thenApply(response -> { + if (response instanceof EnrichSignalWithPreDefinedExtraFieldsResponse(Signal enrichedSignal)) { + return Optional.of(enrichedSignal); + } else if (response instanceof ThingNotAccessibleException) { + return Optional.empty(); + } else { + throw new IllegalStateException("expected EnrichSignalWithPreDefinedExtraFieldsResponse, got: " + response); + } + }); + } + @Override protected CompletionStage> performWotBasedResponseValidation( final CommandResponse filteredResponse diff --git a/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingPersistenceActor.java b/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingPersistenceActor.java index 146e4f5c1e..3e58cda310 100755 --- a/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingPersistenceActor.java +++ b/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingPersistenceActor.java @@ -13,6 +13,8 @@ package org.eclipse.ditto.things.service.persistence.actors; import java.time.Instant; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import javax.annotation.Nullable; @@ -27,6 +29,7 @@ import org.eclipse.ditto.base.model.headers.LiveChannelTimeoutStrategy; import org.eclipse.ditto.base.model.headers.WithDittoHeaders; import org.eclipse.ditto.base.model.json.JsonSchemaVersion; +import org.eclipse.ditto.base.model.signals.Signal; import org.eclipse.ditto.base.model.signals.commands.Command; import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig; import org.eclipse.ditto.internal.utils.persistence.mongo.config.ActivityCheckConfig; @@ -40,6 +43,8 @@ import org.eclipse.ditto.internal.utils.pubsub.extractors.AckExtractor; import org.eclipse.ditto.internal.utils.tracing.span.StartedSpan; import org.eclipse.ditto.json.JsonFactory; +import org.eclipse.ditto.messages.model.signals.commands.MessageCommand; +import org.eclipse.ditto.policies.enforcement.PolicyEnforcerProvider; import org.eclipse.ditto.things.api.commands.sudo.SudoRetrieveThing; import org.eclipse.ditto.things.model.Thing; import org.eclipse.ditto.things.model.ThingBuilder; @@ -55,6 +60,9 @@ import org.eclipse.ditto.things.model.signals.events.ThingEvent; import org.eclipse.ditto.things.service.common.config.DittoThingsConfig; import org.eclipse.ditto.things.service.common.config.ThingConfig; +import org.eclipse.ditto.things.service.persistence.actors.enrichment.EnrichSignalWithPreDefinedExtraFields; +import org.eclipse.ditto.things.service.persistence.actors.enrichment.EnrichSignalWithPreDefinedExtraFieldsResponse; +import org.eclipse.ditto.things.service.persistence.actors.enrichment.PreDefinedExtraFieldsEnricher; import org.eclipse.ditto.things.service.persistence.actors.strategies.commands.ThingCommandStrategies; import org.eclipse.ditto.things.service.persistence.actors.strategies.events.ThingEventStrategies; @@ -85,12 +93,15 @@ public final class ThingPersistenceActor private final ThingConfig thingConfig; private final DistributedPub> distributedPub; @Nullable private final ActorRef searchShardRegionProxy; + private final PreDefinedExtraFieldsEnricher eventPreDefinedExtraFieldsEnricher; + private final PreDefinedExtraFieldsEnricher messagePreDefinedExtraFieldsEnricher; @SuppressWarnings("unused") private ThingPersistenceActor(final ThingId thingId, final MongoReadJournal mongoReadJournal, final DistributedPub> distributedPub, - @Nullable final ActorRef searchShardRegionProxy) { + @Nullable final ActorRef searchShardRegionProxy, + final PolicyEnforcerProvider policyEnforcerProvider) { super(thingId, mongoReadJournal); final DittoThingsConfig thingsConfig = DittoThingsConfig.of( @@ -99,6 +110,14 @@ private ThingPersistenceActor(final ThingId thingId, thingConfig = thingsConfig.getThingConfig(); this.distributedPub = distributedPub; this.searchShardRegionProxy = searchShardRegionProxy; + this.eventPreDefinedExtraFieldsEnricher = new PreDefinedExtraFieldsEnricher( + thingConfig.getEventConfig().getPredefinedExtraFieldsConfigs(), + policyEnforcerProvider + ); + this.messagePreDefinedExtraFieldsEnricher = new PreDefinedExtraFieldsEnricher( + thingConfig.getMessageConfig().getPredefinedExtraFieldsConfigs(), + policyEnforcerProvider + ); } /** @@ -107,15 +126,19 @@ private ThingPersistenceActor(final ThingId thingId, * @param thingId the Thing ID this Actor manages. * @param mongoReadJournal the ReadJournal used for gaining access to historical values of the thing. * @param distributedPub the distributed-pub access to publish thing events. + * @param searchShardRegionProxy the proxy of the shard region of search updaters. + * @param policyEnforcerProvider a provider for the used Policy {@code Enforcer} which "guards" the + * ThingPersistenceActor for applying access control. * @return the Pekko configuration Props object */ public static Props props(final ThingId thingId, final MongoReadJournal mongoReadJournal, final DistributedPub> distributedPub, - @Nullable final ActorRef searchShardRegionProxy) { - + @Nullable final ActorRef searchShardRegionProxy, + final PolicyEnforcerProvider policyEnforcerProvider + ) { return Props.create(ThingPersistenceActor.class, thingId, mongoReadJournal, distributedPub, - searchShardRegionProxy); + searchShardRegionProxy, policyEnforcerProvider); } @Override @@ -209,6 +232,14 @@ protected boolean entityExistsAsDeleted() { return null != entity && entity.hasLifecycle(ThingLifecycle.DELETED); } + @Override + protected Receive matchAnyAfterInitialization() { + return ReceiveBuilder.create() + .match(EnrichSignalWithPreDefinedExtraFields.class, this::enrichSignalWithPreDefinedExtraFields) + .build() + .orElse(super.matchAnyAfterInitialization()); + } + @Override protected Receive matchAnyWhenDeleted() { return ReceiveBuilder.create() @@ -244,10 +275,24 @@ protected void recoveryCompleted(final RecoveryCompleted event) { @Override protected void publishEvent(@Nullable final Thing previousEntity, final ThingEvent event) { - distributedPub.publishWithAcks(event, entityId, ACK_EXTRACTOR, getSelf()); - if (searchShardRegionProxy != null) { - searchShardRegionProxy.tell(event, getSelf()); - } + final CompletionStage> stage = eventPreDefinedExtraFieldsEnricher.enrichWithPredefinedExtraFields( + entityId, + entity, + Optional.ofNullable(previousEntity).flatMap(Thing::getPolicyId).orElse(null), + event + ); + stage.whenComplete((modifiedEvent, ex) -> { + final ThingEvent eventToPublish; + if (ex != null) { + eventToPublish = event; + } else { + eventToPublish = modifiedEvent; + } + distributedPub.publishWithAcks(eventToPublish, entityId, ACK_EXTRACTOR, getSelf()); + if (searchShardRegionProxy != null) { + searchShardRegionProxy.tell(eventToPublish, getSelf()); + } + }); } @Override @@ -277,4 +322,32 @@ private static Thing enhanceThingWithLifecycle(final Thing thing) { return thingBuilder.build(); } + private void enrichSignalWithPreDefinedExtraFields( + final EnrichSignalWithPreDefinedExtraFields enrichSignalWithPreDefinedExtraFields + ) { + final ActorRef sender = getSender(); + final Signal signal = enrichSignalWithPreDefinedExtraFields.signal(); + final CompletionStage> stage; + switch (signal) { + case MessageCommand messageCommand -> + stage = messagePreDefinedExtraFieldsEnricher.enrichWithPredefinedExtraFields( + entityId, + entity, + Optional.ofNullable(entity).flatMap(Thing::getPolicyId).orElse(null), + messageCommand + ); + case ThingEvent thingEvent -> + stage = eventPreDefinedExtraFieldsEnricher.enrichWithPredefinedExtraFields( + entityId, + entity, + Optional.ofNullable(entity).flatMap(Thing::getPolicyId).orElse(null), + thingEvent + ); + default -> + stage = CompletableFuture.completedStage(signal); + } + stage.thenAccept(modifiedSignal -> + sender.tell(new EnrichSignalWithPreDefinedExtraFieldsResponse(modifiedSignal), getSelf()) + ); + } } diff --git a/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingPersistenceActorPropsFactory.java b/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingPersistenceActorPropsFactory.java index 95a982170c..66d11cff03 100644 --- a/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingPersistenceActorPropsFactory.java +++ b/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingPersistenceActorPropsFactory.java @@ -14,14 +14,14 @@ import javax.annotation.Nullable; +import org.apache.pekko.actor.ActorRef; +import org.apache.pekko.actor.Props; import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal; import org.eclipse.ditto.internal.utils.pubsub.DistributedPub; +import org.eclipse.ditto.policies.enforcement.PolicyEnforcerProvider; import org.eclipse.ditto.things.model.ThingId; import org.eclipse.ditto.things.model.signals.events.ThingEvent; -import org.apache.pekko.actor.ActorRef; -import org.apache.pekko.actor.Props; - /** * Factory of thing-persistence-actor. */ @@ -35,8 +35,10 @@ public interface ThingPersistenceActorPropsFactory { * @param mongoReadJournal the ReadJournal used for gaining access to historical values of the thing. * @param distributedPub the distributed-pub access. * @param searchShardRegionProxy the proxy of the shard region of search updaters. + * @param policyEnforcerProvider a provider for the used Policy {@code Enforcer} which "guards" the + * ThingPersistenceActor for applying access control. * @return Props of the thing-persistence-actor. */ Props props(ThingId thingId, MongoReadJournal mongoReadJournal, DistributedPub> distributedPub, - @Nullable ActorRef searchShardRegionProxy); + @Nullable ActorRef searchShardRegionProxy, PolicyEnforcerProvider policyEnforcerProvider); } diff --git a/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingSupervisorActor.java b/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingSupervisorActor.java index 7ee918714a..54a39aea66 100755 --- a/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingSupervisorActor.java +++ b/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingSupervisorActor.java @@ -12,7 +12,6 @@ */ package org.eclipse.ditto.things.service.persistence.actors; - import java.net.URLDecoder; import java.nio.charset.StandardCharsets; import java.time.Duration; @@ -85,6 +84,7 @@ import org.eclipse.ditto.things.service.enforcement.ThingEnforcement; import org.eclipse.ditto.things.service.enforcement.ThingEnforcerActor; import org.eclipse.ditto.things.service.enforcement.ThingPolicyCreated; +import org.eclipse.ditto.things.service.persistence.actors.enrichment.EnrichSignalWithPreDefinedExtraFields; import org.eclipse.ditto.thingsearch.api.ThingsSearchConstants; /** @@ -399,6 +399,14 @@ private void handleRollbackCreatedPolicy(final RollbackCreatedPolicy rollback) { policyCreatedEvent = null; } + private void enrichSignalWithPreDefinedExtraFields(final EnrichSignalWithPreDefinedExtraFields command) { + if (null != persistenceActorChild) { + persistenceActorChild.forward(command, getContext()); + } else { + replyUnavailableException(command.signal(), getSender()); + } + } + @Override protected ThingId getEntityId() throws Exception { return ThingId.of(URLDecoder.decode(getSelf().path().name(), StandardCharsets.UTF_8)); @@ -408,7 +416,7 @@ protected ThingId getEntityId() throws Exception { protected Props getPersistenceActorProps(final ThingId entityId) { assert thingPersistenceActorPropsFactory != null; return thingPersistenceActorPropsFactory.props(entityId, mongoReadJournal, distributedPubThingEventsForTwin, - searchShardRegionProxy); + searchShardRegionProxy, policyEnforcerProvider); } @Override @@ -473,7 +481,9 @@ protected Receive activeBehaviour(final Runnable matchProcessNextTwinMessageBeha log.withCorrelationId(msg.dittoHeaders()) .info("ThingPolicyCreated msg received: <{}>", msg.policyId()); this.policyCreatedEvent = msg; - }).match(RollbackCreatedPolicy.class, this::handleRollbackCreatedPolicy) + }) + .match(RollbackCreatedPolicy.class, this::handleRollbackCreatedPolicy) + .match(EnrichSignalWithPreDefinedExtraFields.class, this::enrichSignalWithPreDefinedExtraFields) .build() .orElse(super.activeBehaviour(matchProcessNextTwinMessageBehavior, matchAnyBehavior)); } diff --git a/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/enrichment/EnrichSignalWithPreDefinedExtraFields.java b/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/enrichment/EnrichSignalWithPreDefinedExtraFields.java new file mode 100644 index 0000000000..5541b42e45 --- /dev/null +++ b/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/enrichment/EnrichSignalWithPreDefinedExtraFields.java @@ -0,0 +1,22 @@ +/* + * Copyright (c) 2025 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.ditto.things.service.persistence.actors.enrichment; + +import org.eclipse.ditto.base.model.signals.Signal; + +/** + * A message to enrich a {@link Signal} with pre-defined extra fields. + * + * @param signal the signal to enrich with configured pre-defined extra fields + */ +public record EnrichSignalWithPreDefinedExtraFields(Signal signal) {} diff --git a/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/enrichment/EnrichSignalWithPreDefinedExtraFieldsResponse.java b/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/enrichment/EnrichSignalWithPreDefinedExtraFieldsResponse.java new file mode 100644 index 0000000000..8349f9ffd3 --- /dev/null +++ b/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/enrichment/EnrichSignalWithPreDefinedExtraFieldsResponse.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2025 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.ditto.things.service.persistence.actors.enrichment; + +import org.eclipse.ditto.base.model.signals.Signal; + +/** + * The response message to {@link EnrichSignalWithPreDefinedExtraFields} containing an enriched {@link Signal} with + * pre-defined extra fields in DittoHeaders. + * + * @param enrichedSignal the enriched signal enriched with configured pre-defined extra fields in DittoHeaders + */ +public record EnrichSignalWithPreDefinedExtraFieldsResponse(Signal enrichedSignal) {} diff --git a/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/enrichment/PreDefinedExtraFieldsEnricher.java b/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/enrichment/PreDefinedExtraFieldsEnricher.java new file mode 100644 index 0000000000..d6026ee78b --- /dev/null +++ b/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/enrichment/PreDefinedExtraFieldsEnricher.java @@ -0,0 +1,217 @@ +/* + * Copyright (c) 2025 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.ditto.things.service.persistence.actors.enrichment; + +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.function.Predicate; +import java.util.stream.StreamSupport; + +import javax.annotation.Nullable; + +import org.eclipse.ditto.base.model.auth.AuthorizationSubject; +import org.eclipse.ditto.base.model.exceptions.InvalidRqlExpressionException; +import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition; +import org.eclipse.ditto.base.model.headers.DittoHeadersSettable; +import org.eclipse.ditto.base.model.headers.WithDittoHeaders; +import org.eclipse.ditto.internal.utils.pekko.logging.DittoLogger; +import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory; +import org.eclipse.ditto.json.JsonArray; +import org.eclipse.ditto.json.JsonCollectors; +import org.eclipse.ditto.json.JsonFactory; +import org.eclipse.ditto.json.JsonField; +import org.eclipse.ditto.json.JsonFieldSelector; +import org.eclipse.ditto.json.JsonObject; +import org.eclipse.ditto.json.JsonObjectBuilder; +import org.eclipse.ditto.json.JsonPointer; +import org.eclipse.ditto.json.JsonValue; +import org.eclipse.ditto.placeholders.HeadersPlaceholder; +import org.eclipse.ditto.placeholders.PlaceholderFactory; +import org.eclipse.ditto.placeholders.TimePlaceholder; +import org.eclipse.ditto.policies.api.Permission; +import org.eclipse.ditto.policies.enforcement.PolicyEnforcerProvider; +import org.eclipse.ditto.policies.model.Permissions; +import org.eclipse.ditto.policies.model.PoliciesResourceType; +import org.eclipse.ditto.policies.model.PolicyId; +import org.eclipse.ditto.rql.parser.RqlPredicateParser; +import org.eclipse.ditto.rql.query.filter.QueryFilterCriteriaFactory; +import org.eclipse.ditto.rql.query.things.ThingPredicateVisitor; +import org.eclipse.ditto.things.model.Thing; +import org.eclipse.ditto.things.model.ThingId; +import org.eclipse.ditto.things.service.common.config.PreDefinedExtraFieldsConfig; + +/** + * Encapsulates functionality in order to perform a "pre-defined" {@code extraFields} enrichment via DittoHeaders of + * fields defined per namespace in the Ditto things configuration. + * + * TODO TJ add unit tests + */ +public final class PreDefinedExtraFieldsEnricher { + + private static final DittoLogger LOGGER = DittoLoggerFactory.getLogger(PreDefinedExtraFieldsEnricher.class); + + private static final TimePlaceholder TIME_PLACEHOLDER = TimePlaceholder.getInstance(); + private static final HeadersPlaceholder HEADERS_PLACEHOLDER = PlaceholderFactory.newHeadersPlaceholder(); + + private final List preDefinedExtraFieldsConfigs; + private final PolicyEnforcerProvider policyEnforcerProvider; + + /** + * Constructs a new enricher of pre-defined extraFields based on the provided configuration and policy enforcer. + * + * @param preDefinedExtraFieldsConfigs the list of config entries for pre-defined extraFields enrichment + * @param policyEnforcerProvider the policy enforcer to use in order to check permissions for enriching extraFields + */ + public PreDefinedExtraFieldsEnricher( + final List preDefinedExtraFieldsConfigs, + final PolicyEnforcerProvider policyEnforcerProvider + ) { + this.preDefinedExtraFieldsConfigs = List.copyOf(preDefinedExtraFieldsConfigs); + this.policyEnforcerProvider = policyEnforcerProvider; + } + + /** + * Enriches the passed in {@code withDittoHeaders} with pre-defined extraFields based on the provided {@code thing} + * and the global configuration this class holds (based on namespace and optional RQL condition). + * + * @param thingId the Thing ID to enrich for + * @param thing the Thing entity to use for getting extra fields from + * @param policyId the Policy ID to use for looking up permissions + * @param withDittoHeaders the object to enrich with pre-defined extraFields (e.g. a Signal) + * @return an enriched version of the passed in {@code withDittoHeaders} with pre-defined extraFields + * @param the type of the signal to enrich + */ + public > CompletionStage enrichWithPredefinedExtraFields( + final ThingId thingId, + @Nullable final Thing thing, + @Nullable final PolicyId policyId, + final T withDittoHeaders + ) { + if (null != thing && !preDefinedExtraFieldsConfigs.isEmpty()) { + final List matchingPreDefinedFieldsConfigs = + preDefinedExtraFieldsConfigs.stream() + .filter(conf -> conf + .getNamespace().stream() + .anyMatch(pattern -> pattern.matcher(thingId.getNamespace()).matches()) + ) + .filter(applyPredefinedExtraFieldsCondition(thing, withDittoHeaders)) + .toList(); + final JsonFieldSelector combinedPredefinedExtraFields = matchingPreDefinedFieldsConfigs.stream() + .map(PreDefinedExtraFieldsConfig::getExtraFields) + .reduce(JsonFactory.newFieldSelector(List.of()), (a, b) -> { + final Set combinedPointerSet = new LinkedHashSet<>(a.getPointers()); + combinedPointerSet.addAll(b.getPointers()); + return JsonFactory.newFieldSelector(combinedPointerSet); + }); + return buildPredefinedExtraFieldsHeaderReadGrantObject(policyId, combinedPredefinedExtraFields) + .thenApply(predefinedExtraFieldsHeaderReadGrantObject -> + withDittoHeaders.setDittoHeaders(withDittoHeaders.getDittoHeaders() + .toBuilder() + .putHeader(DittoHeaderDefinition.PRE_DEFINED_EXTRA_FIELDS.getKey(), + buildPredefinedExtraFieldsHeaderList(combinedPredefinedExtraFields) + ) + .putHeader( + DittoHeaderDefinition.PRE_DEFINED_EXTRA_FIELDS_READ_GRANT_OBJECT.getKey(), + predefinedExtraFieldsHeaderReadGrantObject + ) + .putHeader(DittoHeaderDefinition.PRE_DEFINED_EXTRA_FIELDS_OBJECT.getKey(), + buildPredefinedExtraFieldsHeaderObject(thing, + combinedPredefinedExtraFields).toString() + ) + .build() + ) + ); + } else { + return CompletableFuture.completedStage(withDittoHeaders); + } + } + + private Predicate applyPredefinedExtraFieldsCondition( + final Thing thing, + final WithDittoHeaders withDittoHeaders + ) { + return conf -> { + if (conf.getCondition().isEmpty()) { + return true; + } else { + final String rqlCondition = conf.getCondition().get(); + try { + final var criteria = QueryFilterCriteriaFactory + .modelBased(RqlPredicateParser.getInstance()) + .filterCriteria(rqlCondition, withDittoHeaders.getDittoHeaders()); + + final var predicate = ThingPredicateVisitor.apply( + criteria, + PlaceholderFactory.newPlaceholderResolver(TIME_PLACEHOLDER, new Object()), + PlaceholderFactory.newPlaceholderResolver(HEADERS_PLACEHOLDER, + withDittoHeaders.getDittoHeaders()) + ); + return predicate.test(thing); + } catch (final InvalidRqlExpressionException e) { + LOGGER.warn("Encountered invalid RQL condition <{}> for enriching " + + "predefined extra fields: <{}>", rqlCondition, e.getMessage(), e); + return true; + } + } + }; + } + + private static String buildPredefinedExtraFieldsHeaderList(final JsonFieldSelector preDefinedExtraFields) { + return StreamSupport.stream(preDefinedExtraFields.spliterator(), false) + .map(JsonPointer::toString) + .map(JsonValue::of) + .collect(JsonCollectors.valuesToArray()) + .toString(); + } + + private CompletionStage buildPredefinedExtraFieldsHeaderReadGrantObject( + @Nullable final PolicyId policyId, + final JsonFieldSelector preDefinedExtraFields + ) { + return policyEnforcerProvider.getPolicyEnforcer(policyId) + .thenApply(policyEnforcerOpt -> + policyEnforcerOpt.map(policyEnforcer -> + StreamSupport.stream(preDefinedExtraFields.spliterator(), false) + .map(pointer -> { + final JsonArray unrestrictedReadSubjects = policyEnforcer.getEnforcer() + .getSubjectsWithUnrestrictedPermission( + PoliciesResourceType.thingResource(pointer), + Permissions.newInstance(Permission.READ) + ) + .stream() + .map(AuthorizationSubject::getId) + .map(JsonValue::of) + .collect(JsonCollectors.valuesToArray()); + return JsonField.newInstance(pointer.toString(), unrestrictedReadSubjects); + }) + .collect(JsonCollectors.fieldsToObject()) + .toString() + ).orElse("{}") + ); + } + + private static JsonObject buildPredefinedExtraFieldsHeaderObject( + final Thing thing, + final JsonFieldSelector preDefinedExtraFields + ) { + final JsonObjectBuilder builder = JsonObject.newBuilder(); + final JsonObject thingJson = thing.toJson(); + preDefinedExtraFields.getPointers().forEach(pointer -> + thingJson.getValue(pointer).ifPresent(thingValue -> builder.set(pointer, thingValue)) + ); + return builder.build(); + } +} diff --git a/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/enrichment/package-info.java b/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/enrichment/package-info.java new file mode 100644 index 0000000000..5e3ba7ea5e --- /dev/null +++ b/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/enrichment/package-info.java @@ -0,0 +1,15 @@ +/* + * Copyright (c) 2025 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ + +@org.eclipse.ditto.utils.jsr305.annotations.AllValuesAreNonnullByDefault +package org.eclipse.ditto.things.service.persistence.actors.enrichment; diff --git a/things/service/src/main/java/org/eclipse/ditto/things/service/starter/DefaultThingPersistenceActorPropsFactory.java b/things/service/src/main/java/org/eclipse/ditto/things/service/starter/DefaultThingPersistenceActorPropsFactory.java index c025b424e6..e77d07f945 100644 --- a/things/service/src/main/java/org/eclipse/ditto/things/service/starter/DefaultThingPersistenceActorPropsFactory.java +++ b/things/service/src/main/java/org/eclipse/ditto/things/service/starter/DefaultThingPersistenceActorPropsFactory.java @@ -17,17 +17,17 @@ import javax.annotation.Nullable; import javax.annotation.concurrent.Immutable; +import org.apache.pekko.actor.ActorRef; +import org.apache.pekko.actor.ActorSystem; +import org.apache.pekko.actor.Props; import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal; import org.eclipse.ditto.internal.utils.pubsub.DistributedPub; +import org.eclipse.ditto.policies.enforcement.PolicyEnforcerProvider; import org.eclipse.ditto.things.model.ThingId; import org.eclipse.ditto.things.model.signals.events.ThingEvent; import org.eclipse.ditto.things.service.persistence.actors.ThingPersistenceActor; import org.eclipse.ditto.things.service.persistence.actors.ThingPersistenceActorPropsFactory; -import org.apache.pekko.actor.ActorRef; -import org.apache.pekko.actor.ActorSystem; -import org.apache.pekko.actor.Props; - /** * Factory for creating Props of {@link org.eclipse.ditto.things.service.persistence.actors.ThingPersistenceActor}. */ @@ -53,9 +53,10 @@ static DefaultThingPersistenceActorPropsFactory of(final ActorSystem actorSystem @Override public Props props(final ThingId thingId, final MongoReadJournal mongoReadJournal, - final DistributedPub> distributedPub, - @Nullable final ActorRef searchShardRegionProxy) { + final DistributedPub> distributedPub, @Nullable final ActorRef searchShardRegionProxy, + final PolicyEnforcerProvider policyEnforcerProvider) { argumentNotEmpty(thingId); - return ThingPersistenceActor.props(thingId, mongoReadJournal, distributedPub, searchShardRegionProxy); + return ThingPersistenceActor.props(thingId, mongoReadJournal, distributedPub, searchShardRegionProxy, + policyEnforcerProvider); } } diff --git a/things/service/src/main/resources/things-dev.conf b/things/service/src/main/resources/things-dev.conf index 352a4695d4..9f7d77d3c0 100755 --- a/things/service/src/main/resources/things-dev.conf +++ b/things/service/src/main/resources/things-dev.conf @@ -7,6 +7,53 @@ ditto { metrics.prometheus.port = 9011 things { + thing { + event { + pre-defined-extra-fields = [ + { + namespaces = [ + "*" + ] + condition = "exists(definition)" + extra-fields = [ + "definition" + ] + }, + { + namespaces = [ + "org.eclipse.ditto.lamps" + ] + extra-fields = [ + "attributes/manufacturer", + "attributes/serial" + ] + } + ] + } + + message { + pre-defined-extra-fields = [ + { + namespaces = [ + "*" + ] + condition = "exists(definition)" + extra-fields = [ + "definition" + ] + }, + { + namespaces = [ + "org.eclipse.ditto.lamps" + ] + extra-fields = [ + "attributes/message-stuff" + ] + } + ] + } + } + wot { tm-model-validation { enabled = true diff --git a/things/service/src/main/resources/things.conf b/things/service/src/main/resources/things.conf index e1ff362256..ef05874b76 100755 --- a/things/service/src/main/resources/things.conf +++ b/things/service/src/main/resources/things.conf @@ -103,6 +103,50 @@ ditto { #"user-agent" # the HTTP user-agent header ] historical-headers-to-persist = ${?THING_EVENT_HISTORICAL_HEADERS_TO_PERSIST} + + pre-defined-extra-fields = [ + # { + # namespaces = [ + # "*" + # ] + # condition = "exists(definition)" + # extra-fields = [ + # "definition" + # ] + # }, + # { + # namespaces = [ + # "org.eclipse.ditto.lamps" + # ] + # extra-fields = [ + # "attributes/manufacturer", + # "attributes/serial" + # ] + # } + ] + } + + message { + pre-defined-extra-fields = [ + # { + # namespaces = [ + # "*" + # ] + # condition = "exists(definition)" + # extra-fields = [ + # "definition" + # ] + # }, + # { + # namespaces = [ + # "org.eclipse.ditto.lamps" + # ] + # extra-fields = [ + # "attributes/manufacturer", + # "attributes/serial" + # ] + # } + ] } supervisor { diff --git a/things/service/src/test/java/org/eclipse/ditto/things/service/persistence/actors/PersistenceActorTestBase.java b/things/service/src/test/java/org/eclipse/ditto/things/service/persistence/actors/PersistenceActorTestBase.java index a22c5ac664..b7e5a6838b 100755 --- a/things/service/src/test/java/org/eclipse/ditto/things/service/persistence/actors/PersistenceActorTestBase.java +++ b/things/service/src/test/java/org/eclipse/ditto/things/service/persistence/actors/PersistenceActorTestBase.java @@ -21,6 +21,13 @@ import java.util.function.Predicate; import java.util.stream.Collectors; +import javax.annotation.Nullable; + +import org.apache.pekko.actor.ActorRef; +import org.apache.pekko.actor.ActorSystem; +import org.apache.pekko.actor.Props; +import org.apache.pekko.testkit.TestProbe; +import org.apache.pekko.testkit.javadsl.TestKit; import org.eclipse.ditto.base.model.auth.AuthorizationContext; import org.eclipse.ditto.base.model.auth.AuthorizationModelFactory; import org.eclipse.ditto.base.model.auth.AuthorizationSubject; @@ -62,12 +69,6 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; -import org.apache.pekko.actor.ActorRef; -import org.apache.pekko.actor.ActorSystem; -import org.apache.pekko.actor.Props; -import org.apache.pekko.testkit.TestProbe; -import org.apache.pekko.testkit.javadsl.TestKit; - /** * Base test class for testing persistence actors of the things persistence. */ @@ -228,12 +229,13 @@ protected ActorRef createPersistenceActorFor(final ThingId thingId) { protected ActorRef createPersistenceActorWithPubSubFor(final ThingId thingId) { return actorSystem.actorOf(getPropsOfThingPersistenceActor(thingId, Mockito.mock(MongoReadJournal.class), - getDistributedPub())); + getDistributedPub(), null, policyEnforcerProvider)); } private Props getPropsOfThingPersistenceActor(final ThingId thingId, final MongoReadJournal mongoReadJournal, - final DistributedPub> pub) { - return ThingPersistenceActor.props(thingId, mongoReadJournal, pub, null); + final DistributedPub> pub, @Nullable final ActorRef searchShardRegionProxy, + final PolicyEnforcerProvider policyEnforcerProvider) { + return ThingPersistenceActor.props(thingId, mongoReadJournal, pub, searchShardRegionProxy, policyEnforcerProvider); } protected ActorRef createSupervisorActorFor(final ThingId thingId) { @@ -261,8 +263,7 @@ public > Object wrapForPublicationWithAcks(final S messa } }, liveSignalPub, - (thingId1, mongoReadJournal, pub, searchShardRegionProxy) -> getPropsOfThingPersistenceActor( - thingId1, mongoReadJournal, pub), + this::getPropsOfThingPersistenceActor, null, policyEnforcerProvider, Mockito.mock(MongoReadJournal.class)); diff --git a/things/service/src/test/java/org/eclipse/ditto/things/service/persistence/actors/ThingPersistenceActorTest.java b/things/service/src/test/java/org/eclipse/ditto/things/service/persistence/actors/ThingPersistenceActorTest.java index 7569e21efc..022027eccb 100755 --- a/things/service/src/test/java/org/eclipse/ditto/things/service/persistence/actors/ThingPersistenceActorTest.java +++ b/things/service/src/test/java/org/eclipse/ditto/things/service/persistence/actors/ThingPersistenceActorTest.java @@ -317,7 +317,7 @@ public void tryToCreateThingWithDifferentThingId() { final CreateThing createThing = CreateThing.of(thing, null, dittoHeadersV2); final Props props = ThingPersistenceActor.props(thingIdOfActor, Mockito.mock(MongoReadJournal.class), - getDistributedPub(), null); + getDistributedPub(), null, policyEnforcerProvider); final TestActorRef underTest = TestActorRef.create(actorSystem, props); final ThingPersistenceActor thingPersistenceActor = underTest.underlyingActor(); final PartialFunction receiveCommand = thingPersistenceActor.receiveCommand(); @@ -2053,7 +2053,7 @@ public void unavailableExpectedAndPolicyIsDeletedIfPersistenceActorFails() { ThingId thingId = getIdOrThrow(thing); ActorRef underTest = createSupervisorActorWithCustomPersistenceActor(thingId, - (thingId1, mongoReadJournal, distributedPub, searchShardRegionProxy) -> FailingInCtorActor.props()); + (thingId1, mongoReadJournal, distributedPub, searchShardRegionProxy, policyEnforcerProvider) -> FailingInCtorActor.props()); CreateThing createThing = CreateThing.of(thing, null, dittoHeaders); underTest.tell(createThing, getRef()); @@ -2093,7 +2093,7 @@ public void policyShouldNotBeDeletedOnThingRetrieveAndActorFail() { ThingId thingId = getIdOrThrow(thing); ActorRef underTest = createSupervisorActorWithCustomPersistenceActor(thingId, - (thingId1, mongoReadJournal, distributedPub, searchShardRegionProxy) -> FailingInCtorActor.props()); + (thingId1, mongoReadJournal, distributedPub, searchShardRegionProxy, policyEnforcerProvider) -> FailingInCtorActor.props()); RetrieveThing retrieveThing = RetrieveThing.of(thingId, dittoHeaders); underTest.tell(retrieveThing, getRef()); diff --git a/things/service/src/test/java/org/eclipse/ditto/things/service/persistence/actors/ThingPersistenceOperationsActorIT.java b/things/service/src/test/java/org/eclipse/ditto/things/service/persistence/actors/ThingPersistenceOperationsActorIT.java index bed781abeb..f76d8ef1bc 100644 --- a/things/service/src/test/java/org/eclipse/ditto/things/service/persistence/actors/ThingPersistenceOperationsActorIT.java +++ b/things/service/src/test/java/org/eclipse/ditto/things/service/persistence/actors/ThingPersistenceOperationsActorIT.java @@ -12,6 +12,12 @@ */ package org.eclipse.ditto.things.service.persistence.actors; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +import org.apache.pekko.actor.ActorRef; +import org.apache.pekko.actor.ActorSystem; +import org.apache.pekko.actor.Props; import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition; import org.eclipse.ditto.base.model.headers.DittoHeaders; import org.eclipse.ditto.internal.utils.persistence.mongo.ops.eventsource.MongoEventSourceITAssertions; @@ -40,10 +46,6 @@ import com.typesafe.config.Config; -import org.apache.pekko.actor.ActorRef; -import org.apache.pekko.actor.ActorSystem; -import org.apache.pekko.actor.Props; - /** * Tests {@link ThingPersistenceOperationsActor} against a local MongoDB. */ @@ -59,6 +61,8 @@ public final class ThingPersistenceOperationsActorIT extends MongoEventSourceITA @Before public void setup() { policyEnforcerProvider = Mockito.mock(PolicyEnforcerProvider.class); + Mockito.when(policyEnforcerProvider.getPolicyEnforcer(Mockito.any())) + .thenReturn(CompletableFuture.completedStage(Optional.empty())); } @Test @@ -150,12 +154,7 @@ public > Object wrapForPublicationWithAcks(final S messa } }, liveSignalPub, - (thingId, mongoReadJournal, distributedPub, searchShardRegionProxy) -> ThingPersistenceActor.props( - thingId, - mongoReadJournal, - distributedPub, - null - ), + ThingPersistenceActor::props, null, policyEnforcerProvider, Mockito.mock(MongoReadJournal.class));