Tuesday, September 28, 2010

Developed a capability in the Camel Quartz Component to schedule route activation, de-activation, suspension and resumption

I have just added a feature in Camel-quartz (Camel version 2.5) to facilitate schedule based route activation, de-activation, suspension and resumption.

In enterprise environments, it is frequently necessary to schedule routes to run at certain times during the day or night (for e.g. Routes tied to Batch jobs...).

Scheduling of routes typically involves  the following capabilities

  • Route activation - Starting a route a given start time if the route is in a stopped state awaiting activation.
  • Route de-activation - Shutting down an otherwise active and started route at a given time. 
  • Route suspension - Simply de-activating the route consumer endpoint URI declared on the from(...) section of the route from listening on a given port. The route is still considered as started, however, clients will not be able to send requests along the route. 
  • Route resumption - Resuming the listener on a formerly suspended route consumer endpoint URI. This route is ready to accept requests following route resumption and client requests will be accepted by the route consumer to be forwarded along the route.

The enforcement of the schedule is done via a ScheduledRoutePolicy that must be wired into the route. The ScheduledRoutePolicy currently supports 2 variants
  1. SimpleScheduledRoutePolicy: Where the rules for route activation, de-activation, suspension and resumption are provided using dates, repeat counts and repeat intervals.
  2. CronScheduledRoutePolicy: Where the rules for route activation, de-activation, suspension and resumption are provided using Cron Expressions.
SimpleScheduledRoutePolicy

In order to use a SimpleScheduledRoutePolicy it is necessary to instantiate an object of the type org.apache.camel.routepolicy.quartz.SimpleScheduledRoutePolicy.  The following property values must be set on it to be useful
  • For Starting routes on a Schedule
    • routeStartDate - the initial scheduled Date and time for route start
    • routeStartRepeatCount - no of times to repeat the job
    • routeStartRepeatInterval - the time interval in milliseconds to trigger the next attempt to start the route
  • For Stopping routes on a Schedule
    • routeStopDate - the initial scheduled Date and time for route stop
    • routeStopRepeatCount - no of times to repeat the job
    • routeStopRepeatInterval - the time interval in milliseconds to trigger the next attempt to stop the route
    • routeStopGracePeriod - the time period to wait before initiating graceful route stop (set to 10 seconds by default)
    • routeStopTimeUnit - the time unit for the grace period expressed as java.util.concurrent.TimeUnit (default value is TimeUnit.MILLISECONDS)
  • For Suspending routes on a Schedule
    • routeSuspendDate - the initial scheduled Date and time for route suspension
    • routeSuspendRepeatCount - no of times to repeat the job
    • routeSuspendRepeatInterval - the time interval in milliseconds to trigger the next attempt to start the route
  • For Resuming routes on a Schedule
    • routeResumeDate - the initial scheduled Date and time for route suspension
    • routeResumeRepeatCount - no of times to repeat the job
    • routeResumeRepeatInterval - the time interval in milliseconds to trigger the next attempt to start the route
Given below are examples using a SimpleScheduledRoutePolicy

@Test
public void testScheduledStartRoutePolicy() throws Exception {
    MockEndpoint success = (MockEndpoint) context.getEndpoint("mock:success");        
        
    success.expectedMessageCount(1);
        
    context.getComponent("quartz", QuartzComponent.class).setPropertiesFile("org/apache/camel/routepolicy/quartz/myquartz.properties");
    context.getComponent("quartz", QuartzComponent.class).start();
    context.addRoutes(new RouteBuilder() {
        public void configure() {   
            SimpleScheduledRoutePolicy policy = new SimpleScheduledRoutePolicy();
            long startTime = System.currentTimeMillis() + 3000L;
            policy.setRouteStartDate(new Date(startTime));
            policy.setRouteStartRepeatCount(1);
            policy.setRouteStartRepeatInterval(3000);
                
            from("direct:start")
                .routeId("test")
                .routePolicy(policy)
                .to("mock:success");
        }
    });
    context.start();
    context.stopRoute("test", 0, TimeUnit.MILLISECONDS);
        
    Thread.currentThread().sleep(5000);
    assertTrue(context.getRouteStatus("test") == ServiceStatus.Started);
    template.sendBody("direct:start", "Ready or not, Here, I come");

    context.getComponent("quartz", QuartzComponent.class).stop();
    success.assertIsSatisfied();
}

CronScheduledRoutePolicy

In order to use a CronScheduledRoutePolicy it is necessary to instantiate an object of the type org.apache.camel.routepolicy.quartz.CronScheduledRoutePolicy.  The following property values must be set on it to be useful
  • For Starting routes on a Schedule
    • routeStartTime - the initial scheduled Date and time as a Cron Expression for route start
  • For Stopping routes on a Schedule
    • routeStopTime - the initial scheduled Date and time as a Cron Expression for route stop
    • routeStopGracePeriod - the time period to wait before initiating graceful route stop (set to 10 seconds by default)
    • routeStopTimeUnit - the time unit for the grace period expressed as java.util.concurrent.TimeUnit (default value is TimeUnit.MILLISECONDS)
  • For Suspending routes on a Schedule
    • routeSuspendTime - the initial scheduled Date and time as a Cron Expression for route suspension
  • For Resuming routes on a Schedule
    • routeResumeTime - the initial scheduled Date and time as a Cron Expression for route resumption
Given below are examples using a CronScheduledRoutePolicy

@Test
public void testScheduledStartRoutePolicy() throws Exception {

    MockEndpoint success = (MockEndpoint) context.getEndpoint("mock:success");        
        
    success.expectedMessageCount(1);
        
    context.getComponent("quartz", QuartzComponent.class).setPropertiesFile("org/apache/camel/routepolicy/quartz/myquartz.properties");
    context.getComponent("quartz", QuartzComponent.class).start();
    context.addRoutes(new RouteBuilder() {
        public void configure() {    
            CronScheduledRoutePolicy policy = new CronScheduledRoutePolicy();
            policy.setRouteStartTime("*/3 * * * * ?");
                
            from("direct:start")
                .routeId("test")
                .routePolicy(policy)
                .to("mock:success");
        }
    });
    context.start();
    context.stopRoute("test", 0, TimeUnit.MILLISECONDS);
        
    Thread.currentThread().sleep(4000);
    assertTrue(context.getRouteStatus("test") == ServiceStatus.Started);
    template.sendBody("direct:start", "Ready or not, Here, I come");

    context.getComponent("quartz", QuartzComponent.class).stop();
    success.assertIsSatisfied();
}

Complete Examples

SimpleScheduledRoutePolicyTest.java
CronScheduledRoutePolicyTest.java

Further Details

https://issues.apache.org/activemq/browse/CAMEL-2936

4 comments:

W. O. Wutzke said...

Great stuff!
Keep up with the awesome work! ;)
Cheers!

Ing. Marco Almeida said...

Can u upload the same example using blueprint ?? ty :D

Ing. Marco Almeida said...

I need to start an from:ftp to:ftp every day each 5 minutes. I need a delay for be sure the file copy too. Can u help me ?? Im trying to use blueprint. This app is for create a weather radar decoder. Ty

Unknown said...

My task is read data from database daily once by reading a flag from database(to check whether scheduler to run or not). If the flag is set to false I shouldn't run the scheduler configured route but the route is stopping when I do suspend, which is also fine but route is not getting activated once again the flag is set to true in database. So I don't have any stop date and time etc here. How do we achieve this??? Can anybody please help.