Skip to content
fernandofederico1984 edited this page Jun 18, 2013 · 30 revisions

Motivation

The main motivation of this feature is to generate a Management service of polls and make of poll the unique way to do polling in mule.

Uses cases

  1. As a user, I want to execute a given flow :(every hour regardless of the day/week, on Monday, Weds, Friday at 12 AM, Every 5 minutes)
  2. As a user, I want to be able to try out my poll: (I do not have to wait for the next poll to execute (which may be a day away) to test what will happen, I do not get confused about what is happening because polls continue to execute while I'm trying to debug what is happening)
  3. As a user, I want to be able to test my poll: (I want to run a unit test that executes a poll, I want to start a test without having polls running.)
  4. As an extension developer, I want better APIs to start, stop and execute polls.
  5. As a user, I want to prevent poll executions from executing concurrently
  6. As an extension developer I want to change the polling strategy on runtime
  7. As an extension developer I want to start/stop/run my polls
  8. As an extension developer I want to enrich with notifications of the poll execution.

Design

Mule API to support extension developer's features.

Nowadays we don't have a mule API in order to manage the polls, this implies that are features that we cannot perform or are hard to develop.

Polling Manager and Scheduler

We could have a poll manager. This will handle all types of polling (in the future, for now we can make it handle the poll elements only)

Definition

/**
 * <p>
 * The PollManager is capable of manage all the {@link org.mule.transport.polling.schedulers.PollScheduler} of
 * the mule application.
 * </p>
 * <p>
 * It is registered in the {@link org.mule.api.MuleContext} so it can be accessed by just doing
 * {@link org.mule.api.MuleContext#getPollManager()}
 * </p>
 * <p>
 * The PollManager allows users to hook interceptors so they can modify/wrap the {@link PollScheduler} that is going to
 * be registered or just execute custom code at the registration/unRegistration of a new Scheduler. The PollManager is not
 * thought to be an extension point by itself, Mule will provide the PollManager implementation.
 * If, for some reason,  users change the PollManager implementation it is consider to be done under their own risk.
 * Use the interceptors to hook into the PollManager behaviour.
 * </p>
 * <p>
 * The PollManager synchronizes the access to the {@link PollScheduler} that are registered in it.
 * </p>
 */
public interface PollManager
{

    /**
     * <p>
     * Adds a new {@link PollScheduler} to the poll manager.
     * </p>
     * <p>
     * As part of the registration process the {@link PollScheduler} is started.
     * </p>
     * <p>
     * If there is a scheduler registered with the same {@link org.mule.transport.polling.schedulers.PollSchedulerID}
     * then the registered one is unregistered and is replaced by the new one.
     * </p>
     *
     * @param scheduler The {@link PollScheduler} that needs to be registered from now on.
     * @throws StartException            In case the {@link PollScheduler} could not be started as part of
     *                                   the registration process. The registration process is not completed, so the new
     *                                   {@link PollScheduler} will not be registered
     * @throws PollSchedulerRegistration If a Scheduler is already registered and it could not be stopped.
     *                                   The registration process is not completed, so the new {@link PollScheduler}
     *                                   will not be registered
     */
    void register(PollScheduler scheduler) throws StartException, PollSchedulerRegistration;


    /**
     * <p>
     * Removes from the PollManager a set of {@link PollScheduler} that matches with a {@link Predicate}
     * </p>
     * <p>
     * The unRegister process of a {@link PollScheduler} stops the scheduler before removing it from the PollManager
     * </p>
     *
     * @param schedulerCriteria The {@link Predicate} criteria that will match with a {@link PollScheduler}
     * @return A Collection if the {@link PollScheduler} that could not be removed from the PollManager.
     *         Empty List if successful
     */
    Collection<PollScheduler> unRegister(Predicate schedulerCriteria);


    /**
     * <p>
     * Adds a new interceptor to the PollManager interceptor list. The interceptors are call before the registration
     * process and unregistration process. If a {@link PollScheduler} is registered before an interceptor is added.
     * Then the interceptor will not have any effect on the registered {@link PollScheduler}
     * </p>
     * <p>
     * The order of the interceptors is equals to the adding order
     * </p>
     *
     * @param interceptor The {@link RegisteringInterceptor} that needs to be added.
     */
    void add(RegisteringInterceptor interceptor);


    /**
     * <p>
     * Removes interceptors that matches with the {@link Predicate} criteria
     * </p>
     *
     * @param interceptorCriteria <p>
     *                            The {@link Predicate} matcher to remove an interceptor
     *                            </p>
     */
    void remove(Predicate interceptorCriteria);
}

The Scheduler knows when to execute the receiver and for whom.

/**
 * <p>
 * A PollScheduler is in charge of schedule a task for a poll element.
 * </p>
 * <p>
 * The {@link org.mule.api.lifecycle.Startable#start()} method must start the scheduler which implies executing tasks
 * based on a {@link SchedulingStrategy}
 * </p>
 * <p>
 * The {@link org.mule.api.lifecycle.Stoppable#stop()} method ends up finishing all the scheduled tasks
 * </p>
 */
public interface PollScheduler extends Stoppable, Startable
{

  /**
     * @return The Scheduler identifier
     */
    PollSchedulerID getID();

    /**
     * <p>
     * Executes a Scheduler Task. The timing of when it is executed depends on the PollScheduler implementation.
     * </p>
     *
     * @throws MuleException In case the execution fails
     */
    void execute() throws MuleException;
}

The PollSchedulerId defines the Whom of the scheduler (the flow and endpoint address defines unicity)

/**
 * <p>
 * Identifier for a {@link PollScheduler}. A PollScheduler schedules task for a poll element this identifier represents
 * the location of that poll element.
 * </p>
 * <p>
 * A Single {@link org.mule.construct.Flow} might have multiple Poll {@link org.mule.api.endpoint.InboundEndpoint}, so this
 * ID is composed by both elements.
 * </p>
 */
public class PollSchedulerID
{

    /**
     * <p>
     * The flow name where the poll element is located
     * </p>
     */
    protected String flowName;

    /**
     * <p>
     * The {@link org.mule.api.endpoint.InboundEndpoint} address of the PollElement
     * </p>
     */
    protected EndpointURI endpointURI;

    public PollSchedulerID(String flowName, EndpointURI endpointURI)
    {
        this.flowName = flowName;
        this.endpointURI = endpointURI;
    }

    public String getFlowName()
    {
        return flowName;
    }

    public EndpointURI getEndpointURI()
    {
        return endpointURI;
    }
}


```java
/**
 * <p>
 * Interceptor to be executed on the registration/unregistration process.
 * </p>
 */
public interface RegisteringInterceptor
{

    /**
     * <p>
     * Acts before {@PollScheduler} registration
     * </p>
     */
    PollScheduler onRegister(PollScheduler pollScheduler);

    /**
     * <p>
     * Acts before {@PollScheduler} unRegistration
     * </p>
     */
    
    PollScheduler onUnRegister(PollScheduler pollScheduler);
}

Usage

Starting/running/stopping polls.

Covered scenarios:

  1. I do not have to wait for the next poll to execute (which may be a day away) to test what will happen
  2. As an extension developer, I want better APIs to start, stop and execute polls.
  3. As an extension developer I want to change the polling strategy on runtime
  4. As an extension user I want to start/stop/run my polls
public class MyPollingService extends AbstractService implements PollingService {

 public void executePoll(String appName, final String flowName) throws MuleException
    {
        MuleContext context = getContext(appName);
        PollingManager pollingManager = context.getPollingManager();

        Collection<PollScheduler> schedulers = pollingManager.lookupSchedulers(PollSchedulerID.getFlowNamePredicate(flowName))
        
        for (PollScheduler scheduler : schedulers){
            scheduler.run();
        }

    }
}

Starting with the polls disabled

  1. I do not get confused about what is happening because polls continue to execute while I'm trying to debug what is happening
  2. I want to start a test without having polls running.
  3. As an extension developer I want to change the polling strategy on runtime
  4. As an extension developer I want to enrich with notifications of the poll execution.

Solution: I can hook an Interceptor to the PollManager that Wraps the implementation of the Scheduler and avoids starting the scheduler at start().

public class BoostrapExtension implements Initialisable, MuleContextAware
{

    private MuleContext muleContext;

    @Override
    public void initialise() throws InitialisationException
    {
        muleContext.getPollManager().add(new RegisteringInterceptor
{

    @Override
    public PollScheduler onRegister(final PollScheduler pollScheduler)
    {
        return new PollScheduler()
        {
            @Override
            public PollSchedulerID getID()
            {
                return pollScheduler.getID();
            }

            @Override
            public void execute() throws MuleException
            {
                pollScheduler.execute();
            }

            @Override
            public void start() throws MuleException
            {
                // Do Nothing
            }

            @Override
            public void stop() throws MuleException
            {
                // Do Nothing
            }
        };
    }

    @Override
    public PollScheduler onUnRegister(PollScheduler pollScheduler)
    {
        return pollScheduler;
    }

    }

    @Override
    public void setMuleContext(MuleContext context)
    {
        this.muleContext = context;
    }
}

Pros

  1. Simplifies Stopping/Starting and executing Poll elements
  2. Opens the possibility of a centralized poller to optimize the polling scheduling resources
  3. The PollManager allows hooks based on usage and not redefinition (an example of this is the EndpointFactory)
  4. Separates the When of the How (Receivers are the ones that now how, Schedulers know when)
  5. Eliminates internally poll as an inbound endpoint.
  6. Allows poll to receive different scheduling strategies

Cons

  1. Multitenant Apps in mule must not have a source inside the polls
  2. Might break API backward compatibility

XML definition for final users

The new implementation of poll allows users to inject the polling strategy.

The default implementation to support backward compatibility:

<flow name="test">
   <poll frequency="1000">
       <!-- message source here -->
   </poll>

</flow>

Now the message source is optional so the following is acceptable

<flow name="test">
   <poll frequency="1000"/>
   <mp>
</flow>

Injecting polling strategy by using the quartz module:

Now the quartz module will allow users to define a polling strategy that can be used by poll, and deprecate the quartz inbound endpoint.

<flow name="test">
   <poll>
     <quartz:polling-strategy cronExpression="" />
       <!-- message source here -->
   </poll>

</flow>

Using new improved mule polling strategy:

<flow name="test">
   <poll>
     <polling-strategy frequency="" />
       <!-- message source here -->
   </poll>

</flow>

Defining your own polling strategy

Users can create a bean that defines the polling strategy:

<spring:bean name="customPollingStrategy" class="org.mule.mine.MyPollingStrategyDefinition">
     <!-- Attributes if needed -->
</spring:bean>

<flow name="test">
   <poll >
       <custom-polling-strategy ref="customPollingStrategy"/>
       <!-- message source here -->
   </poll>

</flow>

And the user can define his own scheduler

public class MyPollingStrategyDefinition implements PollSchedulerBuilder
{
    @Override
    public PollScheduler buildFor(AbstractPollingMessageReceiver receiver)
    {
        return createPollScheduler();
    }

}

State full Poll

For stateful poll Mule will provide some DefaultSchedulers implementation based on the State strategy. For example:

  1. QueueScheduler: Queues the tasks in case there are some running
  2. DissmissScheduler: Dismisses the next tasks in case there is some running
  3. FailScheduler: Throws an exception in case there are task running an we want to schedule a new one.

This implementations will use a SchedulingStrategy that will notify the next time the schedule needs to be run.

Pros

  1. The scheduler is configurable to the poll, so we eliminate things like quartz inbound endpoint.
  2. Adds flexibility to the user to define his own scheduling strategy
  3. Does not affect backward compatibility

Cons

  1. Poll will continue having frequency attribute (but deprecated) which might be confusing.
  2. Optional Message source might be confusing for the user