Skip to content
fernandofederico1984 edited this page Jun 18, 2013 · 30 revisions

Stories

  1. As a user, I want to execute a given flow :(every hour regardless of the day/week, on Monday, Weds, Friday at 12 AM, Every 5 minutes)
  2. As a user, I want to be able to try out my poll: (I do not have to wait for the next poll to execute (which may be a day away) to test what will happen, I do not get confused about what is happening because polls continue to execute while I'm trying to debug what is happening)
  3. As a user, I want to be able to test my poll: (I want to run a unit test that executes a poll, I want to start a test without having polls running.)
  4. As an extension developer, I want better APIs to start, stop and execute polls.
  5. As a user, I want to prevent poll executions from executing concurrently
  6. As an extension developer I want to change the polling strategy on runtime
  7. As an extension developer I want to start/stop/run my polls
  8. As an extension developer I want to enrich with notifications of the poll execution.

Proposals

Mule API to support extension developer's features.

Nowadays we don't have a mule java API in order to manage the polls, this implies that are features that we cannot perform or are hard to develop.

Polling Manager and Scheduler

We could have a poll manager. This will handle all types of polling (in the future, for now we can make it handle the poll elements only)

Definition

/**
 * Handles the polls in the application.
 */
public final class PollingManager implements Startable, Stoppable, MuleContextAware
{

    protected MuleContext muleContext;

    /**
     * <p>
     *     Adds a new polling scheduler
     * </p>
     */
    public void register(PollScheduler scheduler);

    /**
     * <p>
     *     Removes the schedulers that matches comparator equals
     * </p>
     */
    public void unRegister(Predicate<PollSchedulerId> comparator);

    /**
     * <p>
     *     Retrieves a set of schedulers based on the comparator matcher
     * </p>
     */
    public  Collection<PollScheduler> lookupSchedulers(Predicate<PollSchedulerId> comparator);

    @Override
    public void start() throws MuleException
    {
        muleContext.fireNotification(new PollingManagerNotification(START_BEGIN, this));

        // Starts the schedulers
        doStart();
        
        muleContext.fireNotification(new PollingManagerNotification(START_END, this));
    }


    @Override
    public void stop() throws MuleException
    {
        muleContext.fireNotification(new PollingManagerNotification(STOP_BEGIN, this));

        // Stops the schedulers
        doStop();

        muleContext.fireNotification(new PollingManagerNotification(STOP_END, this));
    }


    public void setMuleContext(MuleContext context)
    {
       this.muleContext = context; 
    }
}

The Scheduler knows when to execute the receiver and for whom.

public interface PollScheduler extends Stoppable, Startable
{
    /**
     * <p>
     *     Scheduler identifier
     * </p>
     */
    PollSchedulerId getId();

    /**
     * <p>
     *     Runs a scheduler on demand
     * </p>
     */
    void run() throws MuleException;

    /**
     * <p>
     *     Runs a scheduler on demand with a mule event
     * </p>
     */
    void run(MuleEvent event) throws MuleException;

}

The PollSchedulerId defines the Whom of the scheduler (the flow and endpoint address defines unicity)

public class PollSchedulerId
{

    protected String flowConstructName;
    protected EndpointURI endpointURI;

    public PollSchedulerId(String flowConstructName, EndpointURI endpointURI)
    {
        this.flowConstructName = flowConstructName;
        this.endpointURI = endpointURI;
    }

    public String getFlowConstructName()
    {
        return flowConstructName;
    }

    public EndpointURI getEndpointURI()
    {
        return endpointURI;
    }
}

Usage

Starting/running/stopping polls.

Covered scenarios:

  1. I do not have to wait for the next poll to execute (which may be a day away) to test what will happen
  2. As an extension developer, I want better APIs to start, stop and execute polls.
  3. As an extension developer I want to change the polling strategy on runtime
  4. As an extension user I want to start/stop/run my polls
public class MyPollingService extends AbstractService implements PollingService {

 public void executePoll(String appName, final String flowName) throws MuleException
    {
        MuleContext context = getContext(appName);
        PollingManager pollingManager = context.getPollingManager();

        Collection<PollScheduler> schedulers = pollingManager.lookupSchedulers(new Predicate<PollSchedulerId>()
        {
            @Override
            public boolean evaluate(PollSchedulerId id)
            {
                return id.getFlowConstructName().equalsIgnoreCase(flowName);
            }
        });
        
        for (PollScheduler scheduler : schedulers){
            scheduler.run();
        }

    }
}

h5. Starting with the polls disabled

  1. I do not get confused about what is happening because polls continue to execute while I'm trying to debug what is happening
  2. I want to start a test without having polls running.
  3. As an extension developer I want to change the polling strategy on runtime
  4. As an extension developer I want to enrich with notifications of the poll execution.
public class BoostrapExtension implements Initialisable, MuleContextAware
{

    private MuleContext muleContext;

    @Override
    public void initialise() throws InitialisationException
    {
        muleContext.getNotificationManager().addListener(new ServerNotificationListener<PollingManagerNotification>(){

            @Override
            public void onNotification(PollingManagerNotification notification)
            {
                if ( isStartBeginOfPollingManager(notification) ){
                   PollingManager manager = notification.getSource();

                    Collection<PollScheduler> schedulers = manager.lookupSchedulers(new Predicate<PollSchedulerId>()
                    {
                        @Override
                        public boolean evaluate(PollSchedulerId id)
                        {
                            String path = id.getEndpointURI().getPath();

                            return path.startsWith("polling://");
                        }
                    });
                    
                    for (PollScheduler scheduler : schedulers ){
                        manager.register(new MyScheduler(scheduler.getId()));
                    }

                }
            }
        });
    }

    @Override
    public void setMuleContext(MuleContext context)
    {
        this.muleContext = context;
    }
}

h2. XML definition for final users

The new implementation of poll allows users to inject the polling strategy.

h3. The default implementation to support backward compatibility:

<flow name="test">
   <poll frequency="1000">
       <!-- message source here -->
   </poll>

</flow>

h3. Injecting polling strategy by using the quartz module:

Now the quartz module will allow users to define a polling strategy that can be used by poll, and deprecate the quartz inbound endpoint.

<quartz:polling-strategy name="myPollingStrategy" cronExpression="" />

<flow name="test">
   <poll polling-strategy-ref="myPollingStrategy">
       <!-- message source here -->
   </poll>

</flow>

h3. Defining your own polling strategy

Users can create a bean that defines the polling strategy:

<spring:bean name="customPollingStrategy" class="org.mule.mine.MyPollingStrategyDefinition">
     <!-- Attributes if needed -->
</spring:bean>

<flow name="test">
   <poll polling-strategy-ref="customPollingStrategy">
       <!-- message source here -->
   </poll>

</flow>

And the user can define his own scheduler

public class MyPollingStrategyDefinition implements PollSchedulerBuilder
{
    @Override
    public PollScheduler buildFor(AbstractPollingMessageReceiver receiver)
    {
        return createPollScheduler();
    }

}