Skip to content
fernandofederico1984 edited this page Jun 18, 2013 · 30 revisions

Motivation

The main motivation of this feature is to generate a Management service of polls and make of poll the unique way to do polling in mule.

Uses cases

  1. As a user, I want to execute a given flow :(every hour regardless of the day/week, on Monday, Weds, Friday at 12 AM, Every 5 minutes)
  2. As a user, I want to be able to try out my poll: (I do not have to wait for the next poll to execute (which may be a day away) to test what will happen, I do not get confused about what is happening because polls continue to execute while I'm trying to debug what is happening)
  3. As a user, I want to be able to test my poll: (I want to run a unit test that executes a poll, I want to start a test without having polls running.)
  4. As an extension developer, I want better APIs to start, stop and execute polls.
  5. As a user, I want to prevent poll executions from executing concurrently
  6. As an extension developer I want to change the polling strategy on runtime
  7. As an extension developer I want to start/stop/run my polls
  8. As an extension developer I want to enrich with notifications of the poll execution.

Design

Mule API to support extension developer's features.

Nowadays we don't have a mule API in order to manage the polls, this implies that are features that we cannot perform or are hard to develop.

Polling Manager and Scheduler

We could have a poll manager. This will handle all types of polling (in the future, for now we can make it handle the poll elements only)

Definition

/**
 * <p>
 * The PollManager is capable of manage all the {@link org.mule.transport.polling.schedulers.PollScheduler} of
 * the mule application.
 * </p>
 * <p>
 * It is registered in the {@link org.mule.api.MuleContext} so it can be accessed by just doing
 * {@link org.mule.api.MuleContext#getPollManager()}
 * </p>
 * <p>
 * The PollManager allows users to hook interceptors so they can modify/wrap the {@link PollScheduler} that is going to
 * be registered or just execute custom code at the registration/unRegistration of a new Scheduler. The PollManager is not
 * thought to be an extension point by itself, Mule will provide the PollManager implementation.
 * If, for some reason,  users change the PollManager implementation it is consider to be done under their own risk.
 * Use the interceptors to hook into the PollManager behaviour.
 * </p>
 * <p>
 * The PollManager synchronizes the access to the {@link PollScheduler} that are registered in it.
 * </p>
 */
public interface PollManager
{

    /**
     * <p>
     * Adds a new {@link PollScheduler} to the poll manager.
     * </p>
     * <p>
     * As part of the registration process the {@link PollScheduler} is started.
     * </p>
     * <p>
     * If there is a scheduler registered with the same {@link org.mule.transport.polling.schedulers.PollSchedulerID}
     * then the registered one is unregistered and is replaced by the new one.
     * </p>
     *
     * @param scheduler The {@link PollScheduler} that needs to be registered from now on.
     * @throws StartException            In case the {@link PollScheduler} could not be started as part of
     *                                   the registration process. The registration process is not completed, so the new
     *                                   {@link PollScheduler} will not be registered
     * @throws PollSchedulerRegistration If a Scheduler is already registered and it could not be stopped.
     *                                   The registration process is not completed, so the new {@link PollScheduler}
     *                                   will not be registered
     */
    void register(PollScheduler scheduler) throws StartException, PollSchedulerRegistration;


    /**
     * <p>
     * Removes from the PollManager a set of {@link PollScheduler} that matches with a {@link Predicate}
     * </p>
     * <p>
     * The unRegister process of a {@link PollScheduler} stops the scheduler before removing it from the PollManager
     * </p>
     *
     * @param schedulerCriteria The {@link Predicate} criteria that will match with a {@link PollScheduler}
     * @return A Collection if the {@link PollScheduler} that could not be removed from the PollManager.
     *         Empty List if successful
     */
    Collection<PollScheduler> unRegister(Predicate schedulerCriteria);


    /**
     * <p>
     * Adds a new interceptor to the PollManager interceptor list. The interceptors are call before the registration
     * process and unregistration process. If a {@link PollScheduler} is registered before an interceptor is added.
     * Then the interceptor will not have any effect on the registered {@link PollScheduler}
     * </p>
     * <p>
     * The order of the interceptors is equals to the adding order
     * </p>
     *
     * @param interceptor The {@link RegisteringInterceptor} that needs to be added.
     */
    void add(RegisteringInterceptor interceptor);


    /**
     * <p>
     * Removes interceptors that matches with the {@link Predicate} criteria
     * </p>
     *
     * @param interceptorCriteria <p>
     *                            The {@link Predicate} matcher to remove an interceptor
     *                            </p>
     */
    void remove(Predicate interceptorCriteria);
}

The Scheduler knows when to execute the receiver and for whom.

/**
 * <p>
 * A PollScheduler is in charge of schedule a task for a poll element.
 * </p>
 * <p>
 * The {@link org.mule.api.lifecycle.Startable#start()} method must start the scheduler which implies executing tasks
 * based on a {@link SchedulingStrategy}
 * </p>
 * <p>
 * The {@link org.mule.api.lifecycle.Stoppable#stop()} method ends up finishing all the scheduled tasks
 * </p>
 */
public interface PollScheduler extends Stoppable, Startable
{

  /**
     * @return The Scheduler identifier
     */
    PollSchedulerID getID();

    /**
     * <p>
     * Executes a Scheduler Task. The timing of when it is executed depends on the PollScheduler implementation.
     * </p>
     *
     * @throws MuleException In case the execution fails
     */
    void execute() throws MuleException;
}

The PollSchedulerId defines the Whom of the scheduler (the flow and endpoint address defines unicity)

/**
 * <p>
 * Identifier for a {@link PollScheduler}. A PollScheduler schedules task for a poll element this identifier represents
 * the location of that poll element.
 * </p>
 * <p>
 * A Single {@link org.mule.construct.Flow} might have multiple Poll {@link org.mule.api.endpoint.InboundEndpoint}, so this
 * ID is composed by both elements.
 * </p>
 */
public class PollSchedulerID
{

    /**
     * <p>
     * The flow name where the poll element is located
     * </p>
     */
    protected String flowName;

    /**
     * <p>
     * The {@link org.mule.api.endpoint.InboundEndpoint} address of the PollElement
     * </p>
     */
    protected EndpointURI endpointURI;

    public PollSchedulerID(String flowName, EndpointURI endpointURI)
    {
        this.flowName = flowName;
        this.endpointURI = endpointURI;
    }

    public String getFlowName()
    {
        return flowName;
    }

    public EndpointURI getEndpointURI()
    {
        return endpointURI;
    }
}


```java
/**
 * <p>
 * Interceptor to be executed on the registration/unregistration process.
 * </p>
 */
public interface RegisteringInterceptor
{

    /**
     * <p>
     * Acts before {@PollScheduler} registration
     * </p>
     */
    PollScheduler onRegister(PollScheduler pollScheduler);

    /**
     * <p>
     * Acts before {@PollScheduler} unRegistration
     * </p>
     */
    
    PollScheduler onUnRegister(PollScheduler pollScheduler);
}

Usage

Starting/running/stopping polls.

Covered scenarios:

  1. I do not have to wait for the next poll to execute (which may be a day away) to test what will happen
  2. As an extension developer, I want better APIs to start, stop and execute polls.
  3. As an extension developer I want to change the polling strategy on runtime
  4. As an extension user I want to start/stop/run my polls
public class MyPollingService extends AbstractService implements PollingService {

 public void executePoll(String appName, final String flowName) throws MuleException
    {
        MuleContext context = getContext(appName);
        PollingManager pollingManager = context.getPollingManager();

        Collection<PollScheduler> schedulers = pollingManager.lookupSchedulers(PollSchedulerID.getFlowNamePredicate(flowName))
        
        for (PollScheduler scheduler : schedulers){
            scheduler.run();
        }

    }
}

Starting with the polls disabled

  1. I do not get confused about what is happening because polls continue to execute while I'm trying to debug what is happening
  2. I want to start a test without having polls running.
  3. As an extension developer I want to change the polling strategy on runtime
  4. As an extension developer I want to enrich with notifications of the poll execution.
public class BoostrapExtension implements Initialisable, MuleContextAware
{

    private MuleContext muleContext;

    @Override
    public void initialise() throws InitialisationException
    {
        muleContext.getPollManager().add(new RegisteringInterceptor
{

    @Override
    public PollScheduler onRegister(final PollScheduler pollScheduler)
    {
        return new PollScheduler()
        {
            @Override
            public PollSchedulerID getID()
            {
                return pollScheduler.getID();
            }

            @Override
            public void execute() throws MuleException
            {
                pollScheduler.execute();
            }

            @Override
            public void start() throws MuleException
            {
                // Do Nothing
            }

            @Override
            public void stop() throws MuleException
            {
                // Do Nothing
            }
        };
    }

    @Override
    public PollScheduler onUnRegister(PollScheduler pollScheduler)
    {
        return pollScheduler;
    }

    }

    @Override
    public void setMuleContext(MuleContext context)
    {
        this.muleContext = context;
    }
}

XML definition for final users

The new implementation of poll allows users to inject the polling strategy.

The default implementation to support backward compatibility:

<flow name="test">
   <poll frequency="1000">
       <!-- message source here -->
   </poll>

</flow>

Injecting polling strategy by using the quartz module:

Now the quartz module will allow users to define a polling strategy that can be used by poll, and deprecate the quartz inbound endpoint.

<quartz:polling-strategy name="myPollingStrategy" cronExpression="" />

<flow name="test">
   <poll polling-strategy-ref="myPollingStrategy">
       <!-- message source here -->
   </poll>

</flow>

Defining your own polling strategy

Users can create a bean that defines the polling strategy:

<spring:bean name="customPollingStrategy" class="org.mule.mine.MyPollingStrategyDefinition">
     <!-- Attributes if needed -->
</spring:bean>

<flow name="test">
   <poll polling-strategy-ref="customPollingStrategy">
       <!-- message source here -->
   </poll>

</flow>

And the user can define his own scheduler

public class MyPollingStrategyDefinition implements PollSchedulerBuilder
{
    @Override
    public PollScheduler buildFor(AbstractPollingMessageReceiver receiver)
    {
        return createPollScheduler();
    }

}

State full Poll

Nowadays we can make a poll state full by setting the processing strategy in the flow.

 <flow name="pollfoo" processingStrategy="synchronous">
        <poll frequency="1000">
            <outbound-endpoint ref="foo"/>
        </poll>
        <component class="org.mule.test.integration.PollingTestCase$FooComponent"/>
    </flow>