-
Notifications
You must be signed in to change notification settings - Fork 685
Polling 2.0
- As a user, I want to execute a given flow :(every hour regardless of the day/week, on Monday, Weds, Friday at 12 AM, Every 5 minutes)
- As a user, I want to be able to try out my poll: (I do not have to wait for the next poll to execute (which may be a day away) to test what will happen, I do not get confused about what is happening because polls continue to execute while I'm trying to debug what is happening)
- As a user, I want to be able to test my poll: (I want to run a unit test that executes a poll, I want to start a test without having polls running.)
- As an extension developer, I want better APIs to start, stop and execute polls.
- As a user, I want to prevent poll executions from executing concurrently
- As an extension developer I want to change the polling strategy on runtime
- As an extension developer I want to start/stop/run my polls
- As an extension developer I want to enrich with notifications of the poll execution.
Nowadays we don't have a mule java API in order to manage the polls, this implies that are features that we cannot perform or are hard to develop.
We could have a poll manager. This will handle all types of polling (in the future, for now we can make it handle the poll elements only)
/**
* Handles the polls in the application.
*/
public final class PollingManager implements Startable, Stoppable, MuleContextAware
{
protected MuleContext muleContext;
/**
* <p>
* Adds a new polling scheduler
* </p>
*/
public void register(PollScheduler scheduler);
/**
* <p>
* Removes the schedulers that matches comparator equals
* </p>
*/
public void unRegister(Predicate<PollSchedulerId> comparator);
/**
* <p>
* Retrieves a set of schedulers based on the comparator matcher
* </p>
*/
public Collection<PollScheduler> lookupSchedulers(Predicate<PollSchedulerId> comparator);
@Override
public void start() throws MuleException
{
muleContext.fireNotification(new PollingManagerNotification(START_BEGIN, this));
// Starts the schedulers
doStart();
muleContext.fireNotification(new PollingManagerNotification(START_END, this));
}
@Override
public void stop() throws MuleException
{
muleContext.fireNotification(new PollingManagerNotification(STOP_BEGIN, this));
// Stops the schedulers
doStop();
muleContext.fireNotification(new PollingManagerNotification(STOP_END, this));
}
public void setMuleContext(MuleContext context)
{
this.muleContext = context;
}
}
The Scheduler knows when to execute the receiver and for whom.
public interface PollScheduler extends Stoppable, Startable
{
/**
* <p>
* Scheduler identifier
* </p>
*/
PollSchedulerId getId();
/**
* <p>
* Runs a scheduler on demand
* </p>
*/
void run() throws MuleException;
/**
* <p>
* Runs a scheduler on demand with a mule event
* </p>
*/
void run(MuleEvent event) throws MuleException;
}
The PollSchedulerId defines the Whom of the scheduler (the flow and endpoint address defines unicity)
public class PollSchedulerId
{
protected String flowConstructName;
protected EndpointURI endpointURI;
public PollSchedulerId(String flowConstructName, EndpointURI endpointURI)
{
this.flowConstructName = flowConstructName;
this.endpointURI = endpointURI;
}
public String getFlowConstructName()
{
return flowConstructName;
}
public EndpointURI getEndpointURI()
{
return endpointURI;
}
}
Covered scenarios:
- I do not have to wait for the next poll to execute (which may be a day away) to test what will happen
- As an extension developer, I want better APIs to start, stop and execute polls.
- As an extension developer I want to change the polling strategy on runtime
- As an extension user I want to start/stop/run my polls
public class MyPollingService extends AbstractService implements PollingService {
public void executePoll(String appName, final String flowName) throws MuleException
{
MuleContext context = getContext(appName);
PollingManager pollingManager = context.getPollingManager();
Collection<PollScheduler> schedulers = pollingManager.lookupSchedulers(new Predicate<PollSchedulerId>()
{
@Override
public boolean evaluate(PollSchedulerId id)
{
return id.getFlowConstructName().equalsIgnoreCase(flowName);
}
});
for (PollScheduler scheduler : schedulers){
scheduler.run();
}
}
}
h5. Starting with the polls disabled
- I do not get confused about what is happening because polls continue to execute while I'm trying to debug what is happening
- I want to start a test without having polls running.
- As an extension developer I want to change the polling strategy on runtime
- As an extension developer I want to enrich with notifications of the poll execution.
public class BoostrapExtension implements Initialisable, MuleContextAware
{
private MuleContext muleContext;
@Override
public void initialise() throws InitialisationException
{
muleContext.getNotificationManager().addListener(new ServerNotificationListener<PollingManagerNotification>(){
@Override
public void onNotification(PollingManagerNotification notification)
{
if ( isStartBeginOfPollingManager(notification) ){
PollingManager manager = notification.getSource();
Collection<PollScheduler> schedulers = manager.lookupSchedulers(new Predicate<PollSchedulerId>()
{
@Override
public boolean evaluate(PollSchedulerId id)
{
String path = id.getEndpointURI().getPath();
return path.startsWith("polling://");
}
});
for (PollScheduler scheduler : schedulers ){
manager.register(new MyScheduler(scheduler.getId()));
}
}
}
});
}
@Override
public void setMuleContext(MuleContext context)
{
this.muleContext = context;
}
}
h2. XML definition for final users
The new implementation of poll allows users to inject the polling strategy.
h3. The default implementation to support backward compatibility:
<flow name="test">
<poll frequency="1000">
<!-- message source here -->
</poll>
</flow>
h3. Injecting polling strategy by using the quartz module:
Now the quartz module will allow users to define a polling strategy that can be used by poll, and deprecate the quartz inbound endpoint.
<quartz:polling-strategy name="myPollingStrategy" cronExpression="" />
<flow name="test">
<poll polling-strategy-ref="myPollingStrategy">
<!-- message source here -->
</poll>
</flow>
h3. Defining your own polling strategy
Users can create a bean that defines the polling strategy:
<spring:bean name="customPollingStrategy" class="org.mule.mine.MyPollingStrategyDefinition">
<!-- Attributes if needed -->
</spring:bean>
<flow name="test">
<poll polling-strategy-ref="customPollingStrategy">
<!-- message source here -->
</poll>
</flow>
And the user can define his own scheduler
public class MyPollingStrategyDefinition implements PollSchedulerBuilder
{
@Override
public PollScheduler buildFor(AbstractPollingMessageReceiver receiver)
{
return createPollScheduler();
}
}