Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding option to have an inbound event filter in the default event inbound pipeline #3819

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.flowable.eventregistry.api;

/**
* This interface must be implemented by a custom filtering bean to hook into the default inbound event processing
* pipeline in order to have a very effective way to filter out events which should not be processed any further and
* thus preventing expensive processing time like DB lookups and the full consumer invocation.
*
* @param <T> the type of the expected payload (e.g. a JsonNode)
*
* @author Micha Kiener
*/
@FunctionalInterface
public interface InboundEventFilter<T> {

/**
* Returns true, if the event should be further processed or false, if the event can be ignored and will not be processed
* any further and the pipeline will stop afterwards.
*
* @param payload the payload of the event
* @return true, if the event should continue to be processed, false, if the pipeline will ignore the event and stop any
* further processing
*/
boolean filter(T payload);

default boolean filter(FlowableEventInfo<T> event) {
return filter(event.getPayload());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,12 @@ interface InboundEventProcessingPipelineBuilder {
*/
InboundEventKeyXmlDetectorBuilder xmlDeserializer();

/**
* Uses a delegate expression to filter the events before further processing.
* The expression needs to resolve to a bean implementing the {@link org.flowable.eventregistry.api.InboundEventFilter} interface.
*/
InboundEventProcessingPipelineBuilder delegateExpressionEventFilter(String delegateExpression);

/**
* Uses a delegate expression to deserialize the event.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ protected void validateModel(ChannelModel channelModel) {
assertThat(model.getDestination()).isEqualTo("customer");
assertThat(model.getDeserializerType()).isEqualTo("json");

assertThat(model.getEventFilterDelegateExpression()).isEqualTo("testEventFilterExpression");

ChannelEventKeyDetection eventKeyDetection = model.getChannelEventKeyDetection();
assertThat(eventKeyDetection).isNotNull();
assertThat(eventKeyDetection.getFixedValue()).isNull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ protected void validateModel(ChannelModel channelModel) {

assertThat(model.getDeserializerType()).isEqualTo("json");

assertThat(model.getEventFilterDelegateExpression()).isEqualTo("testEventFilterExpression");

ChannelEventKeyDetection eventKeyDetection = model.getChannelEventKeyDetection();
assertThat(eventKeyDetection).isNotNull();
assertThat(eventKeyDetection.getFixedValue()).isNull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ protected void validateModel(ChannelModel channelModel) {

assertThat(model.getDeserializerType()).isEqualTo("json");

assertThat(model.getEventFilterDelegateExpression()).isEqualTo("testEventFilterExpression");

ChannelEventKeyDetection eventKeyDetection = model.getChannelEventKeyDetection();
assertThat(eventKeyDetection).isNotNull();
assertThat(eventKeyDetection.getFixedValue()).isNull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"subscription": "testSubscription",
"concurrency": "3",
"deserializerType": "json",
"eventFilterDelegateExpression": "testEventFilterExpression",
"channelEventKeyDetection": {
"jsonField": "eventKey"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
}
],

"eventFilterDelegateExpression": "testEventFilterExpression",

"deserializerType": "json",
"channelEventKeyDetection": {
"jsonField": "eventKey"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
"executor": "rabbitTaskExecutor",
"ackMode": "NONE",

"eventFilterDelegateExpression": "testEventFilterExpression",

"deserializerType": "json",
"channelEventKeyDetection": {
"jsonField": "eventKey"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public class InboundChannelModel extends ChannelModel {

protected String contextExtractorDelegateExpression;
protected String deserializerDelegateExpression;
protected String eventFilterDelegateExpression;
protected String payloadExtractorDelegateExpression;
protected String headerExtractorDelegateExpression;
protected String eventTransformerDelegateExpression;
Expand All @@ -48,6 +49,14 @@ public void setDeserializerType(String deserializerType) {
this.deserializerType = deserializerType;
}

public String getEventFilterDelegateExpression() {
return eventFilterDelegateExpression;
}

public void setEventFilterDelegateExpression(String eventFilterDelegateExpression) {
this.eventFilterDelegateExpression = eventFilterDelegateExpression;
}

public String getContextExtractorDelegateExpression() {
return contextExtractorDelegateExpression;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,12 @@ public InboundEventKeyXmlDetectorBuilder xmlDeserializer() {
return new InboundEventKeyXmlDetectorBuilderImpl(xmlPipelineBuilder);
}

@Override
public InboundEventProcessingPipelineBuilder delegateExpressionEventFilter(String delegateExpression) {
channelModel.setEventFilterDelegateExpression(delegateExpression);
return this;
}

@Override
public InboundEventKeyDetectorBuilder delegateExpressionDeserializer(String delegateExpression) {
channelModel.setDeserializerType("expression");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@
package org.flowable.eventregistry.impl.pipeline;

import java.util.Collection;
import java.util.Collections;

import org.flowable.common.engine.impl.AbstractEngineConfiguration;
import org.flowable.eventregistry.api.EventRegistryEvent;
import org.flowable.eventregistry.api.EventRepositoryService;
import org.flowable.eventregistry.api.FlowableEventInfo;
import org.flowable.eventregistry.api.InboundEvent;
import org.flowable.eventregistry.api.InboundEventDeserializer;
import org.flowable.eventregistry.api.InboundEventFilter;
import org.flowable.eventregistry.api.InboundEventKeyDetector;
import org.flowable.eventregistry.api.InboundEventPayloadExtractor;
import org.flowable.eventregistry.api.InboundEventProcessingPipeline;
Expand All @@ -44,20 +46,23 @@ public class DefaultInboundEventProcessingPipeline<T> implements InboundEventPro

protected EventRepositoryService eventRepositoryService;
protected InboundEventDeserializer<T> inboundEventDeserializer;
protected InboundEventFilter<T> inboundEventFilter;
protected InboundEventKeyDetector<T> inboundEventKeyDetector;
protected InboundEventTenantDetector<T> inboundEventTenantDetector;
protected InboundEventPayloadExtractor<T> inboundEventPayloadExtractor;
protected InboundEventTransformer inboundEventTransformer;

public DefaultInboundEventProcessingPipeline(EventRepositoryService eventRepositoryService,
InboundEventDeserializer<T> inboundEventDeserializer,
InboundEventFilter<T> inboundEventFilter,
InboundEventKeyDetector<T> inboundEventKeyDetector,
InboundEventTenantDetector<T> inboundEventTenantDetector,
InboundEventPayloadExtractor<T> inboundEventPayloadExtractor,
InboundEventTransformer inboundEventTransformer) {

this.eventRepositoryService = eventRepositoryService;
this.inboundEventDeserializer = inboundEventDeserializer;
this.inboundEventFilter = inboundEventFilter;
this.inboundEventKeyDetector = inboundEventKeyDetector;
this.inboundEventTenantDetector = inboundEventTenantDetector;
this.inboundEventPayloadExtractor = inboundEventPayloadExtractor;
Expand All @@ -75,7 +80,14 @@ public Collection<EventRegistryEvent> run(InboundChannelModel inboundChannel, In
T deserializedBody = deserialize(inboundEvent.getBody());

FlowableEventInfo<T> event = new FlowableEventInfoImpl<>(inboundEvent, deserializedBody, inboundChannel);


// if there is a custom filter in place, invoke it to filter the event for further processing or to abort the pipeline
if (inboundEventFilter != null) {
if (!inboundEventFilter.filter(event)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a debug logging statement here?

return Collections.emptyList();
}
}

String eventKey = detectEventDefinitionKey(event);

boolean multiTenant = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.flowable.eventregistry.api.EventRegistry;
import org.flowable.eventregistry.api.EventRepositoryService;
import org.flowable.eventregistry.api.InboundEventDeserializer;
import org.flowable.eventregistry.api.InboundEventFilter;
import org.flowable.eventregistry.api.InboundEventKeyDetector;
import org.flowable.eventregistry.api.InboundEventPayloadExtractor;
import org.flowable.eventregistry.api.InboundEventProcessingPipeline;
Expand Down Expand Up @@ -128,6 +129,12 @@ protected InboundEventProcessingPipeline createJsonEventProcessingPipeline(Inbou
eventDeserializer = resolveExpression(channelModel.getDeserializerDelegateExpression(), InboundEventDeserializer.class);
}

// by default, there is not filtering of events in place
InboundEventFilter<JsonNode> eventFilter = null;
if (StringUtils.isNotEmpty(channelModel.getEventFilterDelegateExpression())) {
eventFilter = resolveExpression(channelModel.getEventFilterDelegateExpression(), InboundEventFilter.class);
}

InboundEventTenantDetector<JsonNode> eventTenantDetector = null; // By default no multi-tenancy is applied

InboundEventPayloadExtractor<JsonNode> eventPayloadExtractor = createInboundEventPayloadExtractor(channelModel, JsonFieldToMapPayloadExtractor::new);
Expand Down Expand Up @@ -177,7 +184,7 @@ protected InboundEventProcessingPipeline createJsonEventProcessingPipeline(Inbou
}
}

return new DefaultInboundEventProcessingPipeline<>(eventRepositoryService, eventDeserializer,
return new DefaultInboundEventProcessingPipeline<>(eventRepositoryService, eventDeserializer, eventFilter,
eventKeyDetector, eventTenantDetector, eventPayloadExtractor, eventTransformer);
}

Expand All @@ -192,6 +199,12 @@ protected InboundEventProcessingPipeline createXmlEventProcessingPipeline(Inboun
eventDeserializer = resolveExpression(channelModel.getDeserializerDelegateExpression(), InboundEventDeserializer.class);
}

// by default, there is not filtering of events in place
InboundEventFilter<Document> eventFilter = null;
if (StringUtils.isNotEmpty(channelModel.getEventFilterDelegateExpression())) {
eventFilter = resolveExpression(channelModel.getEventFilterDelegateExpression(), InboundEventFilter.class);
}

InboundEventTenantDetector<Document> eventTenantDetector = null; // By default no multi-tenancy is applied

InboundEventPayloadExtractor<Document> eventPayloadExtractor = createInboundEventPayloadExtractor(channelModel, XmlElementsToMapPayloadExtractor::new);
Expand Down Expand Up @@ -239,7 +252,7 @@ protected InboundEventProcessingPipeline createXmlEventProcessingPipeline(Inboun
}
}

return new DefaultInboundEventProcessingPipeline<>(eventRepositoryService, eventDeserializer,
return new DefaultInboundEventProcessingPipeline<>(eventRepositoryService, eventDeserializer, eventFilter,
eventKeyDetector, eventTenantDetector, eventPayloadExtractor, eventTransformer);
}

Expand Down Expand Up @@ -284,6 +297,12 @@ protected InboundEventProcessingPipeline createExpressionEventProcessingPipeline
+ " was empty. The deserializerDelegateExpression has to be provided for a channel with an expression deserializer.");
}

// by default, there is not filtering of events in place
InboundEventFilter<?> eventFilter = null;
if (StringUtils.isNotEmpty(channelModel.getEventFilterDelegateExpression())) {
eventFilter = resolveExpression(channelModel.getEventFilterDelegateExpression(), InboundEventFilter.class);
}

InboundEventTenantDetector<?> eventTenantDetector = null; // By default no multi-tenancy is applied

InboundEventPayloadExtractor<?> eventPayloadExtractor;
Expand Down Expand Up @@ -343,7 +362,7 @@ protected InboundEventProcessingPipeline createExpressionEventProcessingPipeline
}

//noinspection unchecked
return new DefaultInboundEventProcessingPipeline(eventRepositoryService, eventDeserializer,
return new DefaultInboundEventProcessingPipeline(eventRepositoryService, eventDeserializer, eventFilter,
eventKeyDetector, eventTenantDetector, eventPayloadExtractor, eventTransformer);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.flowable.eventregistry.api.InboundEvent;
import org.flowable.eventregistry.api.InboundEventChannelAdapter;
import org.flowable.eventregistry.api.InboundEventDeserializer;
import org.flowable.eventregistry.api.InboundEventFilter;
import org.flowable.eventregistry.api.InboundEventKeyDetector;
import org.flowable.eventregistry.api.InboundEventPayloadExtractor;
import org.flowable.eventregistry.api.InboundEventProcessingPipeline;
Expand Down Expand Up @@ -74,6 +75,7 @@ public void testCustomInboundPipelineComponentsInvoked() {

TestInboundChannelAdapter testInboundChannelAdapter = new TestInboundChannelAdapter();
TestInboundEventDeserializer testInboundEventDeserializer = new TestInboundEventDeserializer();
TestInboundEventFilter testInboundEventFilter = new TestInboundEventFilter(true);
TestInboundEventKeyDetector testInboundEventKeyDetector = new TestInboundEventKeyDetector();
TestInboundEventTenantDetector testInboundEventTenantDetector = new TestInboundEventTenantDetector();
TestInboundEventPayloadExtractor testInboundEventPayloadExtractor = new TestInboundEventPayloadExtractor();
Expand All @@ -82,6 +84,7 @@ public void testCustomInboundPipelineComponentsInvoked() {
Map<Object, Object> beans = eventEngineConfiguration.getExpressionManager().getBeans();
beans.put("testInboundChannelAdapter", testInboundChannelAdapter);
beans.put("testInboundEventDeserializer", testInboundEventDeserializer);
beans.put("testInboundEventFilter", testInboundEventFilter);
beans.put("testInboundEventKeyDetector", testInboundEventKeyDetector);
beans.put("testInboundEventTenantDetector", testInboundEventTenantDetector);
beans.put("testInboundEventPayloadExtractor", testInboundEventPayloadExtractor);
Expand All @@ -91,6 +94,7 @@ public void testCustomInboundPipelineComponentsInvoked() {
.key("customTestChannel")
.resourceName("customTest.channel")
.channelAdapter("${testInboundChannelAdapter}")
.delegateExpressionEventFilter("${testInboundEventFilter}")
.delegateExpressionDeserializer("${testInboundEventDeserializer}")
.delegateExpressionKeyDetector("${testInboundEventKeyDetector}")
.delegateExpressionTenantDetector("${testInboundEventTenantDetector}")
Expand All @@ -107,13 +111,63 @@ public void testCustomInboundPipelineComponentsInvoked() {
testInboundChannelAdapter.trigger("testEvent");

assertThat(testInboundEventDeserializer.counter.get()).isEqualTo(1);
assertThat(testInboundEventFilter.counter.get()).isEqualTo(1);
assertThat(testInboundEventKeyDetector.counter.get()).isEqualTo(1);
assertThat(testInboundEventTenantDetector.counter.get()).isEqualTo(1);
assertThat(testInboundEventPayloadExtractor.payloadCounter.get()).isEqualTo(1);
assertThat(testInboundEventTransformer.counter.get()).isEqualTo(1);

}

@Test
public void testCustomInboundEventDroppingFilter() {

TestInboundChannelAdapter testInboundChannelAdapter = new TestInboundChannelAdapter();
TestInboundEventDeserializer testInboundEventDeserializer = new TestInboundEventDeserializer();
TestInboundEventFilter testInboundEventFilter = new TestInboundEventFilter(false);
TestInboundEventKeyDetector testInboundEventKeyDetector = new TestInboundEventKeyDetector();
TestInboundEventTenantDetector testInboundEventTenantDetector = new TestInboundEventTenantDetector();
TestInboundEventPayloadExtractor testInboundEventPayloadExtractor = new TestInboundEventPayloadExtractor();
TestInboundEventTransformer testInboundEventTransformer = new TestInboundEventTransformer();

Map<Object, Object> beans = eventEngineConfiguration.getExpressionManager().getBeans();
beans.put("testInboundChannelAdapter", testInboundChannelAdapter);
beans.put("testInboundEventDeserializer", testInboundEventDeserializer);
beans.put("testInboundEventFilter", testInboundEventFilter);
beans.put("testInboundEventKeyDetector", testInboundEventKeyDetector);
beans.put("testInboundEventTenantDetector", testInboundEventTenantDetector);
beans.put("testInboundEventPayloadExtractor", testInboundEventPayloadExtractor);
beans.put("testInboundEventTransformer", testInboundEventTransformer);

eventRegistryEngine.getEventRepositoryService().createInboundChannelModelBuilder()
.key("customTestChannel")
.resourceName("customTest.channel")
.channelAdapter("${testInboundChannelAdapter}")
.delegateExpressionEventFilter("${testInboundEventFilter}")
.delegateExpressionDeserializer("${testInboundEventDeserializer}")
.delegateExpressionKeyDetector("${testInboundEventKeyDetector}")
.delegateExpressionTenantDetector("${testInboundEventTenantDetector}")
.payloadExtractor("${testInboundEventPayloadExtractor}")
.transformer("${testInboundEventTransformer}")
.deploy();

eventRegistryEngine.getEventRepositoryService().createEventModelBuilder()
.key("testKey")
.deploymentTenantId("testTenantId")
.resourceName("myEvent.event")
.deploy();

testInboundChannelAdapter.trigger("testEvent");

assertThat(testInboundEventDeserializer.counter.get()).isEqualTo(1);
assertThat(testInboundEventFilter.counter.get()).isEqualTo(1);
assertThat(testInboundEventKeyDetector.counter.get()).isEqualTo(0);
assertThat(testInboundEventTenantDetector.counter.get()).isEqualTo(0);
assertThat(testInboundEventPayloadExtractor.payloadCounter.get()).isEqualTo(0);
assertThat(testInboundEventTransformer.counter.get()).isEqualTo(0);

}

@Test
public void testCustomInboundPipelineInvoked() {

Expand Down Expand Up @@ -344,6 +398,23 @@ public String deserialize(Object rawEvent) {

}

private static class TestInboundEventFilter implements InboundEventFilter<String> {

public AtomicInteger counter = new AtomicInteger(0);
protected final boolean filter;

private TestInboundEventFilter(boolean filter) {
this.filter = filter;
}

@Override
public boolean filter(String payload) {
counter.incrementAndGet();
return filter;
}
}


private static class TestInboundEventKeyDetector implements InboundEventKeyDetector<String> {

public AtomicInteger counter = new AtomicInteger(0);
Expand Down