Skip to content

Commit

Permalink
Improve PR #3819
Browse files Browse the repository at this point in the history
- Move filtering into separate step of the pipeline
- Change method name to 'retain' to make purpose clearer
- Add logging
  • Loading branch information
jbarrez committed Jan 15, 2024
1 parent 895a8e4 commit 6318ff4
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,13 @@
public interface InboundEventFilter<T> {

/**
* Returns true, if the event should be further processed or false, if the event can be ignored and will not be processed
* any further and the pipeline will stop afterwards.
* Returns true, if the event should be further processed
* or false if the event should be ignored and will not be processed any further.
*
* @param payload the payload of the event
* @param event the inbound event information
* @return true, if the event should continue to be processed, false, if the pipeline will ignore the event and stop any
* further processing
*/
boolean filter(T payload);
boolean retain(FlowableEventInfo<T> event);

default boolean filter(FlowableEventInfo<T> event) {
return filter(event.getPayload());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -207,28 +207,64 @@ interface InboundEventProcessingPipelineBuilder {
/**
* Deserializes the event to JSON.
*/
InboundEventKeyJsonDetectorBuilder jsonDeserializer();
InboundEventFilterJsonBuilder jsonDeserializer();

/**
* Deserializes the event to XML.
*/
InboundEventKeyXmlDetectorBuilder xmlDeserializer();
InboundEventFilterXmlBuilder xmlDeserializer();

/**
* Uses a delegate expression to deserialize the event.
*/
InboundEventFilterBuilder delegateExpressionDeserializer(String delegateExpression);

/**
* Uses a delegate expression to determine the custom {@link InboundEventProcessingPipeline} instance.
*/
InboundChannelModelBuilder eventProcessingPipeline(String delegateExpression);

}

/**
* Builder for the filtering out inbound JSON events.
* If events are filtered out, the pipeline processing will stop there and no subsequent steps will be executed.
*/
interface InboundEventFilterJsonBuilder extends InboundEventKeyJsonDetectorBuilder { // Extends because using filtering is optional

/**
* Uses a delegate expression to filter the events before further processing.
* The expression needs to resolve to a bean implementing the {@link org.flowable.eventregistry.api.InboundEventFilter} interface.
*/
InboundEventProcessingPipelineBuilder delegateExpressionEventFilter(String delegateExpression);
InboundEventKeyJsonDetectorBuilder delegateExpressionEventFilter(String delegateExpression);

}

/**
* Builder for the filtering out inbound XML events.
* If events are filtered out, the pipeline processing will stop there and no subsequent steps will be executed.
*/
interface InboundEventFilterXmlBuilder extends InboundEventKeyXmlDetectorBuilder { // Extends because using filtering is optional

/**
* Uses a delegate expression to deserialize the event.
* Uses a delegate expression to filter the events before further processing.
* The expression needs to resolve to a bean implementing the {@link org.flowable.eventregistry.api.InboundEventFilter} interface.
*/
InboundEventKeyDetectorBuilder delegateExpressionDeserializer(String delegateExpression);
InboundEventKeyXmlDetectorBuilder delegateExpressionEventFilter(String delegateExpression);

}

/**
* Builder for the filtering out inbound events.
* If events are filtered out, the pipeline processing will stop there and no subsequent steps will be executed.
*/
interface InboundEventFilterBuilder extends InboundEventKeyDetectorBuilder { // Extends because using filtering is optional

/**
* Uses a delegate expression to determine the custom {@link InboundEventProcessingPipeline} instance.
* Uses a delegate expression to filter the events before further processing.
* The expression needs to resolve to a bean implementing the {@link org.flowable.eventregistry.api.InboundEventFilter} interface.
*/
InboundChannelModelBuilder eventProcessingPipeline(String delegateExpression);
InboundEventKeyDetectorBuilder delegateExpressionEventFilter(String delegateExpression);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,47 +320,89 @@ public InboundEventProcessingPipelineBuilderImpl(InboundChannelModel channelMode
}

@Override
public InboundEventKeyJsonDetectorBuilder jsonDeserializer() {
public InboundEventFilterJsonBuilder jsonDeserializer() {
channelModel.setDeserializerType("json");

InboundEventProcessingPipelineBuilderImpl<JsonNode> jsonPipelineBuilder
= new InboundEventProcessingPipelineBuilderImpl<>(channelModel, eventRepository, channelDefinitionBuilder);
this.channelDefinitionBuilder.inboundEventProcessingPipelineBuilder = jsonPipelineBuilder;

return new InboundEventKeyJsonDetectorBuilderImpl(jsonPipelineBuilder);
return new InboundEventFilterJsonBuilderImpl(jsonPipelineBuilder);
}

@Override
public InboundEventKeyXmlDetectorBuilder xmlDeserializer() {
public InboundEventFilterXmlBuilder xmlDeserializer() {
channelModel.setDeserializerType("xml");
InboundEventProcessingPipelineBuilderImpl<Document> xmlPipelineBuilder
= new InboundEventProcessingPipelineBuilderImpl<>(channelModel, eventRepository, channelDefinitionBuilder);
this.channelDefinitionBuilder.inboundEventProcessingPipelineBuilder = xmlPipelineBuilder;

return new InboundEventKeyXmlDetectorBuilderImpl(xmlPipelineBuilder);
return new InboundEventFilterXmlBuilderImpl(xmlPipelineBuilder);
}

@Override
public InboundEventProcessingPipelineBuilder delegateExpressionEventFilter(String delegateExpression) {
channelModel.setEventFilterDelegateExpression(delegateExpression);
return this;
}

@Override
public InboundEventKeyDetectorBuilder delegateExpressionDeserializer(String delegateExpression) {
public InboundEventFilterBuilder delegateExpressionDeserializer(String delegateExpression) {
channelModel.setDeserializerType("expression");
channelModel.setDeserializerDelegateExpression(delegateExpression);
InboundEventProcessingPipelineBuilderImpl customPipelineBuilder = new InboundEventProcessingPipelineBuilderImpl<>(channelModel,
eventRepository, channelDefinitionBuilder);
this.channelDefinitionBuilder.inboundEventProcessingPipelineBuilder = customPipelineBuilder;
return new InboundEventDefinitionKeyDetectorBuilderImpl(customPipelineBuilder);
return new InboundEventFilterBuilderImpl(customPipelineBuilder);
}

@Override
public InboundChannelModelBuilder eventProcessingPipeline(String delegateExpression) {
this.channelModel.setPipelineDelegateExpression(delegateExpression);
return channelDefinitionBuilder;
}
}

public static class InboundEventFilterJsonBuilderImpl extends InboundEventKeyJsonDetectorBuilderImpl implements InboundEventFilterJsonBuilder {

public InboundEventFilterJsonBuilderImpl(InboundEventProcessingPipelineBuilderImpl<JsonNode> inboundEventProcessingPipelineBuilder) {
super(inboundEventProcessingPipelineBuilder);
}

@Override
public InboundEventKeyJsonDetectorBuilder delegateExpressionEventFilter(String delegateExpression) {
inboundEventProcessingPipelineBuilder.channelModel.setEventFilterDelegateExpression(delegateExpression);
return new InboundEventKeyJsonDetectorBuilderImpl(inboundEventProcessingPipelineBuilder);
}

}

public static class InboundEventFilterXmlBuilderImpl extends InboundEventKeyXmlDetectorBuilderImpl implements InboundEventFilterXmlBuilder {

public InboundEventFilterXmlBuilderImpl(InboundEventProcessingPipelineBuilderImpl<Document> inboundEventProcessingPipelineBuilder) {
super(inboundEventProcessingPipelineBuilder);
}

@Override
public InboundEventKeyXmlDetectorBuilder delegateExpressionEventFilter(String delegateExpression) {
inboundEventProcessingPipelineBuilder.channelModel.setEventFilterDelegateExpression(delegateExpression);
return new InboundEventKeyXmlDetectorBuilderImpl(inboundEventProcessingPipelineBuilder);
}

}

public static class InboundEventFilterBuilderImpl implements InboundEventFilterBuilder {

protected InboundEventProcessingPipelineBuilderImpl inboundEventProcessingPipelineBuilder;

public InboundEventFilterBuilderImpl(InboundEventProcessingPipelineBuilderImpl inboundEventProcessingPipelineBuilder) {
this.inboundEventProcessingPipelineBuilder = inboundEventProcessingPipelineBuilder;
}

@Override
public InboundEventKeyDetectorBuilder delegateExpressionEventFilter(String delegateExpression) {
inboundEventProcessingPipelineBuilder.channelModel.setEventFilterDelegateExpression(delegateExpression);
return new InboundEventDefinitionKeyDetectorBuilderImpl(inboundEventProcessingPipelineBuilder);
}

@Override
public InboundEventTenantDetectorBuilder delegateExpressionKeyDetector(String delegateExpression) {
return null;
}

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,12 @@ public Collection<EventRegistryEvent> run(InboundChannelModel inboundChannel, In

FlowableEventInfo<T> event = new FlowableEventInfoImpl<>(inboundEvent, deserializedBody, inboundChannel);

// if there is a custom filter in place, invoke it to filter the event for further processing or to abort the pipeline
// if there is a custom filter in place, invoke it to retain only the events that are wanted or to abort the pipeline
if (inboundEventFilter != null) {
if (!inboundEventFilter.filter(event)) {
if (!inboundEventFilter.retain(event)) {
if (debugLoggingEnabled) {
logger.debug("Inbound event {} on inbound {} channel {} was filtered out.", inboundEvent, inboundChannel.getChannelType(), inboundChannel.getKey());
}
return Collections.emptyList();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import org.flowable.eventregistry.api.EventRegistry;
import org.flowable.eventregistry.api.EventRegistryEvent;
import org.flowable.eventregistry.api.FlowableEventInfo;
import org.flowable.eventregistry.api.InboundEvent;
import org.flowable.eventregistry.api.InboundEventChannelAdapter;
import org.flowable.eventregistry.api.InboundEventDeserializer;
Expand Down Expand Up @@ -94,8 +95,8 @@ public void testCustomInboundPipelineComponentsInvoked() {
.key("customTestChannel")
.resourceName("customTest.channel")
.channelAdapter("${testInboundChannelAdapter}")
.delegateExpressionEventFilter("${testInboundEventFilter}")
.delegateExpressionDeserializer("${testInboundEventDeserializer}")
.delegateExpressionEventFilter("${testInboundEventFilter}")
.delegateExpressionKeyDetector("${testInboundEventKeyDetector}")
.delegateExpressionTenantDetector("${testInboundEventTenantDetector}")
.payloadExtractor("${testInboundEventPayloadExtractor}")
Expand Down Expand Up @@ -143,8 +144,8 @@ public void testCustomInboundEventDroppingFilter() {
.key("customTestChannel")
.resourceName("customTest.channel")
.channelAdapter("${testInboundChannelAdapter}")
.delegateExpressionEventFilter("${testInboundEventFilter}")
.delegateExpressionDeserializer("${testInboundEventDeserializer}")
.delegateExpressionEventFilter("${testInboundEventFilter}")
.delegateExpressionKeyDetector("${testInboundEventKeyDetector}")
.delegateExpressionTenantDetector("${testInboundEventTenantDetector}")
.payloadExtractor("${testInboundEventPayloadExtractor}")
Expand Down Expand Up @@ -408,7 +409,7 @@ private TestInboundEventFilter(boolean filter) {
}

@Override
public boolean filter(String payload) {
public boolean retain(FlowableEventInfo<String> event) {
counter.incrementAndGet();
return filter;
}
Expand Down

0 comments on commit 6318ff4

Please sign in to comment.