-
Notifications
You must be signed in to change notification settings - Fork 685
Polling 2.0
The main motivation of this feature is to generate a Management service of polls and make of poll the unique way to do polling in mule.
- As a user, I want to execute a given flow :(every hour regardless of the day/week, on Monday, Weds, Friday at 12 AM, Every 5 minutes)
- As a user, I want to be able to try out my poll: (I do not have to wait for the next poll to execute (which may be a day away) to test what will happen, I do not get confused about what is happening because polls continue to execute while I'm trying to debug what is happening)
- As a user, I want to be able to test my poll: (I want to run a unit test that executes a poll, I want to start a test without having polls running.)
- As an extension developer, I want better APIs to start, stop and execute polls.
- As a user, I want to prevent poll executions from executing concurrently
- As an extension developer I want to change the polling strategy on runtime
- As an extension developer I want to start/stop/run my polls
- As an extension developer I want to enrich with notifications of the poll execution.
Nowadays we don't have a mule API in order to manage the polls, this implies that are features that we cannot perform or are hard to develop.
We could have a poll manager. This will handle all types of polling (in the future, for now we can make it handle the poll elements only)
/**
* <p>
* The PollManager is capable of manage all the {@link org.mule.transport.polling.schedulers.PollScheduler} of
* the mule application.
* </p>
* <p>
* It is registered in the {@link org.mule.api.MuleContext} so it can be accessed by just doing
* {@link org.mule.api.MuleContext#getPollManager()}
* </p>
* <p>
* The PollManager allows users to hook interceptors so they can modify/wrap the {@link PollScheduler} that is going to
* be registered or just execute custom code at the registration/unRegistration of a new Scheduler. The PollManager is not
* thought to be an extension point by itself, Mule will provide the PollManager implementation.
* If, for some reason, users change the PollManager implementation it is consider to be done under their own risk.
* Use the interceptors to hook into the PollManager behaviour.
* </p>
* <p>
* The PollManager synchronizes the access to the {@link PollScheduler} that are registered in it.
* </p>
*/
public interface PollManager
{
/**
* <p>
* Adds a new {@link PollScheduler} to the poll manager.
* </p>
* <p>
* As part of the registration process the {@link PollScheduler} is started.
* </p>
* <p>
* If there is a scheduler registered with the same {@link org.mule.transport.polling.schedulers.PollSchedulerID}
* then the registered one is unregistered and is replaced by the new one.
* </p>
*
* @param scheduler The {@link PollScheduler} that needs to be registered from now on.
* @throws StartException In case the {@link PollScheduler} could not be started as part of
* the registration process. The registration process is not completed, so the new
* {@link PollScheduler} will not be registered
* @throws PollSchedulerRegistration If a Scheduler is already registered and it could not be stopped.
* The registration process is not completed, so the new {@link PollScheduler}
* will not be registered
*/
void register(PollScheduler scheduler) throws StartException, PollSchedulerRegistration;
/**
* <p>
* Removes from the PollManager a set of {@link PollScheduler} that matches with a {@link Predicate}
* </p>
* <p>
* The unRegister process of a {@link PollScheduler} stops the scheduler before removing it from the PollManager
* </p>
*
* @param schedulerCriteria The {@link Predicate} criteria that will match with a {@link PollScheduler}
* @return A Collection if the {@link PollScheduler} that could not be removed from the PollManager.
* Empty List if successful
*/
Collection<PollScheduler> unRegister(Predicate schedulerCriteria);
/**
* <p>
* Adds a new interceptor to the PollManager interceptor list. The interceptors are call before the registration
* process and unregistration process. If a {@link PollScheduler} is registered before an interceptor is added.
* Then the interceptor will not have any effect on the registered {@link PollScheduler}
* </p>
* <p>
* The order of the interceptors is equals to the adding order
* </p>
*
* @param interceptor The {@link RegisteringInterceptor} that needs to be added.
*/
void add(RegisteringInterceptor interceptor);
/**
* <p>
* Removes interceptors that matches with the {@link Predicate} criteria
* </p>
*
* @param interceptorCriteria <p>
* The {@link Predicate} matcher to remove an interceptor
* </p>
*/
void remove(Predicate interceptorCriteria);
}
The Scheduler knows when to execute the receiver and for whom.
/**
* <p>
* A PollScheduler is in charge of schedule a task for a poll element.
* </p>
* <p>
* The {@link org.mule.api.lifecycle.Startable#start()} method must start the scheduler which implies executing tasks
* based on a {@link SchedulingStrategy}
* </p>
* <p>
* The {@link org.mule.api.lifecycle.Stoppable#stop()} method ends up finishing all the scheduled tasks
* </p>
*/
public interface PollScheduler extends Stoppable, Startable
{
/**
* @return The Scheduler identifier
*/
PollSchedulerID getID();
/**
* <p>
* Executes a Scheduler Task. The timing of when it is executed depends on the PollScheduler implementation.
* </p>
*
* @throws MuleException In case the execution fails
*/
void execute() throws MuleException;
}
The PollSchedulerId defines the Whom of the scheduler (the flow and endpoint address defines unicity)
/**
* <p>
* Identifier for a {@link PollScheduler}. A PollScheduler schedules task for a poll element this identifier represents
* the location of that poll element.
* </p>
* <p>
* A Single {@link org.mule.construct.Flow} might have multiple Poll {@link org.mule.api.endpoint.InboundEndpoint}, so this
* ID is composed by both elements.
* </p>
*/
public class PollSchedulerID
{
/**
* <p>
* The flow name where the poll element is located
* </p>
*/
protected String flowName;
/**
* <p>
* The {@link org.mule.api.endpoint.InboundEndpoint} address of the PollElement
* </p>
*/
protected EndpointURI endpointURI;
public PollSchedulerID(String flowName, EndpointURI endpointURI)
{
this.flowName = flowName;
this.endpointURI = endpointURI;
}
public String getFlowName()
{
return flowName;
}
public EndpointURI getEndpointURI()
{
return endpointURI;
}
}
```java
/**
* <p>
* Interceptor to be executed on the registration/unregistration process.
* </p>
*/
public interface RegisteringInterceptor
{
/**
* <p>
* Acts before {@PollScheduler} registration
* </p>
*/
PollScheduler onRegister(PollScheduler pollScheduler);
/**
* <p>
* Acts before {@PollScheduler} unRegistration
* </p>
*/
PollScheduler onUnRegister(PollScheduler pollScheduler);
}
Covered scenarios:
- I do not have to wait for the next poll to execute (which may be a day away) to test what will happen
- As an extension developer, I want better APIs to start, stop and execute polls.
- As an extension developer I want to change the polling strategy on runtime
- As an extension user I want to start/stop/run my polls
public class MyPollingService extends AbstractService implements PollingService {
public void executePoll(String appName, final String flowName) throws MuleException
{
MuleContext context = getContext(appName);
PollingManager pollingManager = context.getPollingManager();
Collection<PollScheduler> schedulers = pollingManager.lookupSchedulers(PollSchedulerID.getFlowNamePredicate(flowName))
for (PollScheduler scheduler : schedulers){
scheduler.run();
}
}
}
- I do not get confused about what is happening because polls continue to execute while I'm trying to debug what is happening
- I want to start a test without having polls running.
- As an extension developer I want to change the polling strategy on runtime
- As an extension developer I want to enrich with notifications of the poll execution.
Solution: I can hook an Interceptor to the PollManager that Wraps the implementation of the Scheduler and avoids starting the scheduler at start().
public class BoostrapExtension implements Initialisable, MuleContextAware
{
private MuleContext muleContext;
@Override
public void initialise() throws InitialisationException
{
muleContext.getPollManager().add(new RegisteringInterceptor
{
@Override
public PollScheduler onRegister(final PollScheduler pollScheduler)
{
return new PollScheduler()
{
@Override
public PollSchedulerID getID()
{
return pollScheduler.getID();
}
@Override
public void execute() throws MuleException
{
pollScheduler.execute();
}
@Override
public void start() throws MuleException
{
// Do Nothing
}
@Override
public void stop() throws MuleException
{
// Do Nothing
}
};
}
@Override
public PollScheduler onUnRegister(PollScheduler pollScheduler)
{
return pollScheduler;
}
}
@Override
public void setMuleContext(MuleContext context)
{
this.muleContext = context;
}
}
- Simplifies Stopping/Starting and executing Poll elements
- Opens the possibility of a centralized poller to optimize the polling scheduling resources
- The PollManager allows hooks based on usage and not redefinition (an example of this is the EndpointFactory)
- Separates the When of the How (Receivers are the ones that now how, Schedulers know when)
- Eliminates internally poll as an inbound endpoint.
- Allows poll to receive different scheduling strategies
- Multitenant Apps in mule must not have a source inside the polls
- Might break API backward compatibility
The new implementation of poll allows users to inject the polling strategy.
<flow name="test">
<poll frequency="1000">
<!-- message source here -->
</poll>
</flow>
<flow name="test">
<poll frequency="1000"/>
<mp>
</flow>
Now the quartz module will allow users to define a polling strategy that can be used by poll, and deprecate the quartz inbound endpoint.
<flow name="test">
<poll>
<quartz:polling-strategy cronExpression="" />
<!-- message source here -->
</poll>
</flow>
<flow name="test">
<poll>
<polling-strategy frequency="" />
<!-- message source here -->
</poll>
</flow>
Users can create a bean that defines the polling strategy:
<spring:bean name="customPollingStrategy" class="org.mule.mine.MyPollingStrategyDefinition">
<!-- Attributes if needed -->
</spring:bean>
<flow name="test">
<poll >
<custom-polling-strategy ref="customPollingStrategy"/>
<!-- message source here -->
</poll>
</flow>
And the user can define his own scheduler
public class MyPollingStrategyDefinition implements PollSchedulerBuilder
{
@Override
public PollScheduler buildFor(AbstractPollingMessageReceiver receiver)
{
return createPollScheduler();
}
}
For stateful poll Mule will provide some DefaultSchedulers implementation based on the State strategy. For example:
- QueueScheduler: Queues the tasks in case there are some running
- DissmissScheduler: Dismisses the next tasks in case there is some running
- FailScheduler: Throws an exception in case there are task running an we want to schedule a new one.
This implementations will use a SchedulingStrategy that will notify the next time the schedule needs to be run.
- The scheduler is configurable to the poll, so we eliminate things like quartz inbound endpoint.
- Adds flexibility to the user to define his own scheduling strategy
- Does not affect backward compatibility
- Poll will continue having frequency attribute (but deprecated) which might be confusing.
- Optional Message source might be confusing for the user