Skip to content

Commit

Permalink
Add event filter after key detection, as the key is often used in fil…
Browse files Browse the repository at this point in the history
…tering
  • Loading branch information
jbarrez committed Jan 15, 2024
1 parent d8f57c0 commit d190b4d
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ public interface InboundEventFilter<T> {
* Returns true, if the event should be further processed
* or false if the event should be ignored and will not be processed any further.
*
* @param eventDefinitionKey the key for the inbound event
* @param event the inbound event information
* @return true, if the event should continue to be processed, false, if the pipeline will ignore the event and stop any
* further processing
*/
boolean retain(FlowableEventInfo<T> event);
boolean retain(String eventDefinitionKey, FlowableEventInfo<T> event);

}
Original file line number Diff line number Diff line change
Expand Up @@ -207,17 +207,17 @@ interface InboundEventProcessingPipelineBuilder {
/**
* Deserializes the event to JSON.
*/
InboundEventFilterJsonBuilder jsonDeserializer();
InboundEventKeyJsonDetectorBuilder jsonDeserializer();

/**
* Deserializes the event to XML.
*/
InboundEventFilterXmlBuilder xmlDeserializer();
InboundEventKeyXmlDetectorBuilder xmlDeserializer();

/**
* Uses a delegate expression to deserialize the event.
*/
InboundEventFilterBuilder delegateExpressionDeserializer(String delegateExpression);
InboundEventKeyDetectorBuilder delegateExpressionDeserializer(String delegateExpression);

/**
* Uses a delegate expression to determine the custom {@link InboundEventProcessingPipeline} instance.
Expand All @@ -226,48 +226,6 @@ interface InboundEventProcessingPipelineBuilder {

}

/**
* Builder for the filtering out inbound JSON events.
* If events are filtered out, the pipeline processing will stop there and no subsequent steps will be executed.
*/
interface InboundEventFilterJsonBuilder extends InboundEventKeyJsonDetectorBuilder { // Extends because using filtering is optional

/**
* Uses a delegate expression to filter the events before further processing.
* The expression needs to resolve to a bean implementing the {@link org.flowable.eventregistry.api.InboundEventFilter} interface.
*/
InboundEventKeyJsonDetectorBuilder delegateExpressionEventFilter(String delegateExpression);

}

/**
* Builder for the filtering out inbound XML events.
* If events are filtered out, the pipeline processing will stop there and no subsequent steps will be executed.
*/
interface InboundEventFilterXmlBuilder extends InboundEventKeyXmlDetectorBuilder { // Extends because using filtering is optional

/**
* Uses a delegate expression to filter the events before further processing.
* The expression needs to resolve to a bean implementing the {@link org.flowable.eventregistry.api.InboundEventFilter} interface.
*/
InboundEventKeyXmlDetectorBuilder delegateExpressionEventFilter(String delegateExpression);

}

/**
* Builder for the filtering out inbound events.
* If events are filtered out, the pipeline processing will stop there and no subsequent steps will be executed.
*/
interface InboundEventFilterBuilder extends InboundEventKeyDetectorBuilder { // Extends because using filtering is optional

/**
* Uses a delegate expression to filter the events before further processing.
* The expression needs to resolve to a bean implementing the {@link org.flowable.eventregistry.api.InboundEventFilter} interface.
*/
InboundEventKeyDetectorBuilder delegateExpressionEventFilter(String delegateExpression);

}

/**
* Builder for the 'key detection' part of the {@link InboundChannelModel}, specifically for JSON events.
*/
Expand All @@ -276,17 +234,17 @@ interface InboundEventKeyJsonDetectorBuilder {
/**
* Sets the event key to a hardcoded value. This is useful when the channel only receives one type of event.
*/
InboundEventTenantJsonDetectorBuilder fixedEventKey(String key);
InboundEventFilterJsonBuilder fixedEventKey(String key);

/**
* Determines the key of the event based on a top-level field in the JSON representation in the event.
*/
InboundEventTenantJsonDetectorBuilder detectEventKeyUsingJsonField(String field);
InboundEventFilterJsonBuilder detectEventKeyUsingJsonField(String field);

/**
* Determines the key of the event using on a JSON Pointer expression to find the value.
*/
InboundEventTenantJsonDetectorBuilder detectEventKeyUsingJsonPointerExpression(String jsonPointerExpression);
InboundEventFilterJsonBuilder detectEventKeyUsingJsonPointerExpression(String jsonPointerExpression);

}

Expand Down Expand Up @@ -315,12 +273,12 @@ interface InboundEventKeyXmlDetectorBuilder {
/**
* Sets the event key to a hardcoded value. This is useful when the channel only receives one type of event.
*/
InboundEventTenantXmlDetectorBuilder fixedEventKey(String key);
InboundEventFilterXmlBuilder fixedEventKey(String key);

/**
* Determines the key of the event using on a XPATH expression to find the value.
*/
InboundEventTenantXmlDetectorBuilder detectEventKeyUsingXPathExpression(String xPathExpression);
InboundEventFilterXmlBuilder detectEventKeyUsingXPathExpression(String xPathExpression);

}

Expand Down Expand Up @@ -349,7 +307,49 @@ interface InboundEventKeyDetectorBuilder {
/**
* Uses delegate expression to determine the custom {@link InboundEventKeyDetector}.
*/
InboundEventTenantDetectorBuilder delegateExpressionKeyDetector(String delegateExpression);
InboundEventFilterBuilder delegateExpressionKeyDetector(String delegateExpression);

}

/**
* Builder for the filtering out inbound JSON events.
* If events are filtered out, the pipeline processing will stop there and no subsequent steps will be executed.
*/
interface InboundEventFilterJsonBuilder extends InboundEventTenantJsonDetectorBuilder { // Extends because using filtering is optional

/**
* Uses a delegate expression to filter the events before further processing.
* The expression needs to resolve to a bean implementing the {@link org.flowable.eventregistry.api.InboundEventFilter} interface.
*/
InboundEventTenantJsonDetectorBuilder delegateExpressionEventFilter(String delegateExpression);

}

/**
* Builder for the filtering out inbound XML events.
* If events are filtered out, the pipeline processing will stop there and no subsequent steps will be executed.
*/
interface InboundEventFilterXmlBuilder extends InboundEventTenantXmlDetectorBuilder { // Extends because using filtering is optional

/**
* Uses a delegate expression to filter the events before further processing.
* The expression needs to resolve to a bean implementing the {@link org.flowable.eventregistry.api.InboundEventFilter} interface.
*/
InboundEventTenantXmlDetectorBuilder delegateExpressionEventFilter(String delegateExpression);

}

/**
* Builder for the filtering out inbound events.
* If events are filtered out, the pipeline processing will stop there and no subsequent steps will be executed.
*/
interface InboundEventFilterBuilder extends InboundEventTenantDetectorBuilder { // Extends because using filtering is optional

/**
* Uses a delegate expression to filter the events before further processing.
* The expression needs to resolve to a bean implementing the {@link org.flowable.eventregistry.api.InboundEventFilter} interface.
*/
InboundEventTenantDetectorBuilder delegateExpressionEventFilter(String delegateExpression);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,34 +320,34 @@ public InboundEventProcessingPipelineBuilderImpl(InboundChannelModel channelMode
}

@Override
public InboundEventFilterJsonBuilder jsonDeserializer() {
public InboundEventKeyJsonDetectorBuilder jsonDeserializer() {
channelModel.setDeserializerType("json");

InboundEventProcessingPipelineBuilderImpl<JsonNode> jsonPipelineBuilder
= new InboundEventProcessingPipelineBuilderImpl<>(channelModel, eventRepository, channelDefinitionBuilder);
this.channelDefinitionBuilder.inboundEventProcessingPipelineBuilder = jsonPipelineBuilder;

return new InboundEventFilterJsonBuilderImpl(jsonPipelineBuilder);
return new InboundEventKeyJsonDetectorBuilderImpl(jsonPipelineBuilder);
}

@Override
public InboundEventFilterXmlBuilder xmlDeserializer() {
public InboundEventKeyXmlDetectorBuilder xmlDeserializer() {
channelModel.setDeserializerType("xml");
InboundEventProcessingPipelineBuilderImpl<Document> xmlPipelineBuilder
= new InboundEventProcessingPipelineBuilderImpl<>(channelModel, eventRepository, channelDefinitionBuilder);
this.channelDefinitionBuilder.inboundEventProcessingPipelineBuilder = xmlPipelineBuilder;

return new InboundEventFilterXmlBuilderImpl(xmlPipelineBuilder);
return new InboundEventKeyXmlDetectorBuilderImpl(xmlPipelineBuilder);
}

@Override
public InboundEventFilterBuilder delegateExpressionDeserializer(String delegateExpression) {
public InboundEventKeyDetectorBuilder delegateExpressionDeserializer(String delegateExpression) {
channelModel.setDeserializerType("expression");
channelModel.setDeserializerDelegateExpression(delegateExpression);
InboundEventProcessingPipelineBuilderImpl customPipelineBuilder = new InboundEventProcessingPipelineBuilderImpl<>(channelModel,
eventRepository, channelDefinitionBuilder);
this.channelDefinitionBuilder.inboundEventProcessingPipelineBuilder = customPipelineBuilder;
return new InboundEventFilterBuilderImpl(customPipelineBuilder);
return new InboundEventDefinitionKeyDetectorBuilderImpl(customPipelineBuilder);
}

@Override
Expand All @@ -357,51 +357,44 @@ public InboundChannelModelBuilder eventProcessingPipeline(String delegateExpress
}
}

public static class InboundEventFilterJsonBuilderImpl extends InboundEventKeyJsonDetectorBuilderImpl implements InboundEventFilterJsonBuilder {
public static class InboundEventFilterJsonBuilderImpl extends InboundEventTenantJsonDetectorBuilderImpl implements InboundEventFilterJsonBuilder {

public InboundEventFilterJsonBuilderImpl(InboundEventProcessingPipelineBuilderImpl<JsonNode> inboundEventProcessingPipelineBuilder) {
super(inboundEventProcessingPipelineBuilder);
}

@Override
public InboundEventKeyJsonDetectorBuilder delegateExpressionEventFilter(String delegateExpression) {
public InboundEventTenantJsonDetectorBuilder delegateExpressionEventFilter(String delegateExpression) {
inboundEventProcessingPipelineBuilder.channelModel.setEventFilterDelegateExpression(delegateExpression);
return new InboundEventKeyJsonDetectorBuilderImpl(inboundEventProcessingPipelineBuilder);
return new InboundEventTenantJsonDetectorBuilderImpl(inboundEventProcessingPipelineBuilder);
}

}

public static class InboundEventFilterXmlBuilderImpl extends InboundEventKeyXmlDetectorBuilderImpl implements InboundEventFilterXmlBuilder {
public static class InboundEventFilterXmlBuilderImpl extends InboundEventTenantXmlDetectorBuilderImpl implements InboundEventFilterXmlBuilder {

public InboundEventFilterXmlBuilderImpl(InboundEventProcessingPipelineBuilderImpl<Document> inboundEventProcessingPipelineBuilder) {
super(inboundEventProcessingPipelineBuilder);
}

@Override
public InboundEventKeyXmlDetectorBuilder delegateExpressionEventFilter(String delegateExpression) {
public InboundEventTenantXmlDetectorBuilder delegateExpressionEventFilter(String delegateExpression) {
inboundEventProcessingPipelineBuilder.channelModel.setEventFilterDelegateExpression(delegateExpression);
return new InboundEventKeyXmlDetectorBuilderImpl(inboundEventProcessingPipelineBuilder);
return new InboundEventTenantXmlDetectorBuilderImpl(inboundEventProcessingPipelineBuilder);
}

}

public static class InboundEventFilterBuilderImpl implements InboundEventFilterBuilder {

protected InboundEventProcessingPipelineBuilderImpl inboundEventProcessingPipelineBuilder;
public static class InboundEventFilterBuilderImpl extends InboundEventTenantDetectorBuilderImpl implements InboundEventFilterBuilder {

public InboundEventFilterBuilderImpl(InboundEventProcessingPipelineBuilderImpl inboundEventProcessingPipelineBuilder) {
this.inboundEventProcessingPipelineBuilder = inboundEventProcessingPipelineBuilder;
super(inboundEventProcessingPipelineBuilder);
}

@Override
public InboundEventKeyDetectorBuilder delegateExpressionEventFilter(String delegateExpression) {
public InboundEventTenantDetectorBuilder delegateExpressionEventFilter(String delegateExpression) {
inboundEventProcessingPipelineBuilder.channelModel.setEventFilterDelegateExpression(delegateExpression);
return new InboundEventDefinitionKeyDetectorBuilderImpl(inboundEventProcessingPipelineBuilder);
}

@Override
public InboundEventTenantDetectorBuilder delegateExpressionKeyDetector(String delegateExpression) {
return null;
return new InboundEventTenantDetectorBuilderImpl(inboundEventProcessingPipelineBuilder);
}

}
Expand All @@ -415,27 +408,27 @@ public InboundEventKeyJsonDetectorBuilderImpl(InboundEventProcessingPipelineBuil
}

@Override
public InboundEventTenantJsonDetectorBuilder fixedEventKey(String key) {
public InboundEventFilterJsonBuilder fixedEventKey(String key) {
ChannelEventKeyDetection keyDetection = new ChannelEventKeyDetection();
keyDetection.setFixedValue(key);
this.inboundEventProcessingPipelineBuilder.channelModel.setChannelEventKeyDetection(keyDetection);
return new InboundEventTenantJsonDetectorBuilderImpl(inboundEventProcessingPipelineBuilder);
return new InboundEventFilterJsonBuilderImpl(inboundEventProcessingPipelineBuilder);
}

@Override
public InboundEventTenantJsonDetectorBuilder detectEventKeyUsingJsonField(String field) {
public InboundEventFilterJsonBuilder detectEventKeyUsingJsonField(String field) {
ChannelEventKeyDetection keyDetection = new ChannelEventKeyDetection();
keyDetection.setJsonField(field);
this.inboundEventProcessingPipelineBuilder.channelModel.setChannelEventKeyDetection(keyDetection);
return new InboundEventTenantJsonDetectorBuilderImpl(inboundEventProcessingPipelineBuilder);
return new InboundEventFilterJsonBuilderImpl(inboundEventProcessingPipelineBuilder);
}

@Override
public InboundEventTenantJsonDetectorBuilder detectEventKeyUsingJsonPointerExpression(String jsonPointerExpression) {
public InboundEventFilterJsonBuilder detectEventKeyUsingJsonPointerExpression(String jsonPointerExpression) {
ChannelEventKeyDetection keyDetection = new ChannelEventKeyDetection();
keyDetection.setJsonPointerExpression(jsonPointerExpression);
this.inboundEventProcessingPipelineBuilder.channelModel.setChannelEventKeyDetection(keyDetection);
return new InboundEventTenantJsonDetectorBuilderImpl(inboundEventProcessingPipelineBuilder);
return new InboundEventFilterJsonBuilderImpl(inboundEventProcessingPipelineBuilder);
}
}

Expand All @@ -448,19 +441,19 @@ public InboundEventKeyXmlDetectorBuilderImpl(InboundEventProcessingPipelineBuild
}

@Override
public InboundEventTenantXmlDetectorBuilder fixedEventKey(String key) {
public InboundEventFilterXmlBuilder fixedEventKey(String key) {
ChannelEventKeyDetection keyDetection = new ChannelEventKeyDetection();
keyDetection.setFixedValue(key);
this.inboundEventProcessingPipelineBuilder.channelModel.setChannelEventKeyDetection(keyDetection);
return new InboundEventTenantXmlDetectorBuilderImpl(inboundEventProcessingPipelineBuilder);
return new InboundEventFilterXmlBuilderImpl(inboundEventProcessingPipelineBuilder);
}

@Override
public InboundEventTenantXmlDetectorBuilder detectEventKeyUsingXPathExpression(String xPathExpression) {
public InboundEventFilterXmlBuilder detectEventKeyUsingXPathExpression(String xPathExpression) {
ChannelEventKeyDetection keyDetection = new ChannelEventKeyDetection();
keyDetection.setXmlXPathExpression(xPathExpression);
this.inboundEventProcessingPipelineBuilder.channelModel.setChannelEventKeyDetection(keyDetection);
return new InboundEventTenantXmlDetectorBuilderImpl(inboundEventProcessingPipelineBuilder);
return new InboundEventFilterXmlBuilderImpl(inboundEventProcessingPipelineBuilder);
}

}
Expand All @@ -474,11 +467,11 @@ public InboundEventDefinitionKeyDetectorBuilderImpl(InboundEventProcessingPipeli
}

@Override
public InboundEventTenantDetectorBuilder delegateExpressionKeyDetector(String delegateExpression) {
public InboundEventFilterBuilder delegateExpressionKeyDetector(String delegateExpression) {
ChannelEventKeyDetection keyDetection = new ChannelEventKeyDetection();
keyDetection.setDelegateExpression(delegateExpression);
inboundEventProcessingPipelineBuilder.channelModel.setChannelEventKeyDetection(keyDetection);
return new InboundEventTenantDetectorBuilderImpl(inboundEventProcessingPipelineBuilder);
return new InboundEventFilterBuilderImpl(inboundEventProcessingPipelineBuilder);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,18 @@ public Collection<EventRegistryEvent> run(InboundChannelModel inboundChannel, In

FlowableEventInfo<T> event = new FlowableEventInfoImpl<>(inboundEvent, deserializedBody, inboundChannel);

String eventKey = detectEventDefinitionKey(event);

// if there is a custom filter in place, invoke it to retain only the events that are wanted or to abort the pipeline
if (inboundEventFilter != null) {
if (!inboundEventFilter.retain(event)) {
if (!inboundEventFilter.retain(eventKey, event)) {
if (debugLoggingEnabled) {
logger.debug("Inbound event {} on inbound {} channel {} was filtered out.", inboundEvent, inboundChannel.getChannelType(), inboundChannel.getKey());
}
return Collections.emptyList();
}
}

String eventKey = detectEventDefinitionKey(event);

boolean multiTenant = false;
String tenantId = AbstractEngineConfiguration.NO_TENANT_ID;
if (inboundEventTenantDetector != null) {
Expand Down
Loading

0 comments on commit d190b4d

Please sign in to comment.