Skip to content

[3.5. M3] Summarizers

Mariano Gonzalez edited this page Oct 31, 2013 · 6 revisions

Jira: https://www.mulesoft.org/jira/browse/MULE-7082

Forum discussion: http://forum.mulesoft.org/mulesoft/topics/_3_5_m3_summarizers?rfm=1

Motivation / Context

When dealing with collections it's sometimes necessary to run some kind of aggregation functions. For example, suppose you have a flow with a poll using watermark. In that flow you query salesforce for contacts. Your flows requires to:

  • Obtain the greatest timestamp for updating the watermark
  • Calculate the average age for all those contacts
  • Add the expected revenue from each contact (custom field)

Currently there's no easy way to do this in Mule. Easiest solution thus far is to make a custom transformer or using DataMapper's output variables (might be overkill and doesn't work in CE)

Uses cases

  • As a SaaS developer I want to be able to summarize the payload by the following default functions: add, average, count, max, min
  • As a SaaS developer I want to be able to apply several functions to the same payload
  • For each function I want to be required to provide a MEL expression to be used as the input
  • For each function I want to be required to provide a MEL expression to be used as the target (following the same semantics as the enricher)
  • As a SaaS developer I want this to be compatible with payloads of Iterable, Iterator and Array types
  • As a SaaS developer I want this to support streaming mode so that I can use it with large collections or auto paging iterators

Behaviour

This will be implemented as a new CE module called summary.

Per each invocation to the extract message processor, new instances of the aggregation functions will be created so that they can be discarded once information is extracted.

The functions that will be available by default are:

  • add: Expects the input expression to return a numeric value which is added into a counter. Return type is Double
  • average: Expects the input expression to return a numeric value which is used to calculate an average value. Return type is Double.
  • count: Counts how many items are contained in the payload. Return type is Long
  • max: Expects the input expression to return an object that implements Comparable and returns the one that was the greatest. The return type is the one of the greatest Comparable
  • min: Expects the input expression to return an object that implements Comparable and returns the one that was the lowest. The return type is the one of the lowest Comparable

Syntax

This is an example flow using this feature:

<flow name="polling">
	<poll frequency="5000">
		<watermark variable="pollingJobWatermarkFlag" default-expression="#[new Date()]" update-expression="#[flowVars['timestamp']]"/>	
		<sfdc:query="..." />
	</poll>

	<summary:extract streaming="[false|true]">
		<summary:max input="#[payload.updatedOn]" target="flowVars['timestamp']" />
		<summary:avg input="#[payload.age"] target="flowVars['averageAge']" />
		<summary:sum input="#[payload.expectedRevenue"] target="flowVars['expectedRevenue']" />
	</summary>

	<foreach>
		<mp:xx />
		<mp:xx />
		<mp:xx />
	</foreach>
</flow>

The default value for the streaming attribute will be false

Semmantics

The semmantics of the summary:extract processor changes depending on wether streaming mode is being used or not:

No Streaming mode (default)

In this case, the payload is assumed to be an Iterable or an Array. When the extract message processor is executed, that payload will be iterated fully so that the aggregation functions can get their result values and set them in the message.

This means that going back to the example, the timestamp, averageAge and expectedRevenue invocation variables are already available by the time the MuleMessage enters the foreach.

The return type of the extract message processor will be exactly the same it was invoked with.

If the payload is an Iterator, then streaming mode is forced for this invokation.

Streaming mode

The main and most common scenario for this feature is when the payload is an Iterator (meaning we only get one forward-only read). This is the case when using Anypoint Connectors that are capable of auto-paging. This is the reason why streaming mode is automatically enabled when the payload is an Iterator even if streaming="false".

A secondary use case for this feature would be cases in which the payload is an Iterable or Array so big that iterating over it twice would introduce a performance issue.

When doing streaming mode, the output of the extract processor is no longer the original payload but a custom Iterator that will execute the functions on each invocation of the next() method and will set the result values when an invocation to the hasNext() method returns false.

This has the following implications:

  • Going back to the original example, unlike non-streaming mode the output variables will not be available when the message enters the foreach. On the contrary, they will be made available once foreach finishes iterating.
  • If for whatever reason iteration is aborted and hasNext() never returns false then the output values will never be made available.

Error handling

  • If any function finds an error condition it will throw an exception and execution of the other functions is aborted. Also, no properties will be set on the original message.
  • The user will get a MessagingException wrapping the original one as would with any other message processor.
  • If default mode is being used then the exception will be thrown as soon as a function fails, but if streaming mode is being used then the exception will be thrown upon invocations of Iterator.next() since function execution gets deferred to that moment.
  • The exception thrown will be catchable by a flow exception-strategy as with any other exception.

Extensibility

In case you need to add your very own custom functions we'll provide the following extension point:

<summary:extract>
    <summary:max input="#[payload.updatedOn]" target="flowVars['timestamp']" />
    <summary:custom input=#[payload.someValue]" target="flowVars['sdv']" class="com.myproject.StandardDeviation"/>
    <summary:custom input="#[payload.externalId]" target="flowVars['cycles']" class="com.myproject.CycleDetector" />
</summary:extract>

In the example above we show how the functionality can be leverage to do more advance processing like calculating stardard deviation or detecting circular updates in two-way syncs scenarios. The output of the aggregation function does not require the return value to be an scalar, it can be a List or any other kind of object

Risks

  • User friction understanding the semmantic differences between default and streaming mode

Mule Studio Impact

  • New editor required with support for nested elements, once per each function.

DevKit Impact

No impact

MMC Impact

No impact

CH Impact

No impact

Service Registry Impact

No impact

Migration Impact

No impact since this is a new feature

Documentation Impact

  • Add new doc page describing the feature
  • Update training documents