Tuesday, September 28, 2010

Developed a capability in the Camel Quartz Component to schedule route activation, de-activation, suspension and resumption

I have just added a feature in Camel-quartz (Camel version 2.5) to facilitate schedule based route activation, de-activation, suspension and resumption.

In enterprise environments, it is frequently necessary to schedule routes to run at certain times during the day or night (for e.g. Routes tied to Batch jobs...).

Scheduling of routes typically involves  the following capabilities

  • Route activation - Starting a route a given start time if the route is in a stopped state awaiting activation.
  • Route de-activation - Shutting down an otherwise active and started route at a given time. 
  • Route suspension - Simply de-activating the route consumer endpoint URI declared on the from(...) section of the route from listening on a given port. The route is still considered as started, however, clients will not be able to send requests along the route. 
  • Route resumption - Resuming the listener on a formerly suspended route consumer endpoint URI. This route is ready to accept requests following route resumption and client requests will be accepted by the route consumer to be forwarded along the route.

The enforcement of the schedule is done via a ScheduledRoutePolicy that must be wired into the route. The ScheduledRoutePolicy currently supports 2 variants
  1. SimpleScheduledRoutePolicy: Where the rules for route activation, de-activation, suspension and resumption are provided using dates, repeat counts and repeat intervals.
  2. CronScheduledRoutePolicy: Where the rules for route activation, de-activation, suspension and resumption are provided using Cron Expressions.
SimpleScheduledRoutePolicy

In order to use a SimpleScheduledRoutePolicy it is necessary to instantiate an object of the type org.apache.camel.routepolicy.quartz.SimpleScheduledRoutePolicy.  The following property values must be set on it to be useful
  • For Starting routes on a Schedule
    • routeStartDate - the initial scheduled Date and time for route start
    • routeStartRepeatCount - no of times to repeat the job
    • routeStartRepeatInterval - the time interval in milliseconds to trigger the next attempt to start the route
  • For Stopping routes on a Schedule
    • routeStopDate - the initial scheduled Date and time for route stop
    • routeStopRepeatCount - no of times to repeat the job
    • routeStopRepeatInterval - the time interval in milliseconds to trigger the next attempt to stop the route
    • routeStopGracePeriod - the time period to wait before initiating graceful route stop (set to 10 seconds by default)
    • routeStopTimeUnit - the time unit for the grace period expressed as java.util.concurrent.TimeUnit (default value is TimeUnit.MILLISECONDS)
  • For Suspending routes on a Schedule
    • routeSuspendDate - the initial scheduled Date and time for route suspension
    • routeSuspendRepeatCount - no of times to repeat the job
    • routeSuspendRepeatInterval - the time interval in milliseconds to trigger the next attempt to start the route
  • For Resuming routes on a Schedule
    • routeResumeDate - the initial scheduled Date and time for route suspension
    • routeResumeRepeatCount - no of times to repeat the job
    • routeResumeRepeatInterval - the time interval in milliseconds to trigger the next attempt to start the route
Given below are examples using a SimpleScheduledRoutePolicy

@Test
public void testScheduledStartRoutePolicy() throws Exception {
    MockEndpoint success = (MockEndpoint) context.getEndpoint("mock:success");        
        
    success.expectedMessageCount(1);
        
    context.getComponent("quartz", QuartzComponent.class).setPropertiesFile("org/apache/camel/routepolicy/quartz/myquartz.properties");
    context.getComponent("quartz", QuartzComponent.class).start();
    context.addRoutes(new RouteBuilder() {
        public void configure() {   
            SimpleScheduledRoutePolicy policy = new SimpleScheduledRoutePolicy();
            long startTime = System.currentTimeMillis() + 3000L;
            policy.setRouteStartDate(new Date(startTime));
            policy.setRouteStartRepeatCount(1);
            policy.setRouteStartRepeatInterval(3000);
                
            from("direct:start")
                .routeId("test")
                .routePolicy(policy)
                .to("mock:success");
        }
    });
    context.start();
    context.stopRoute("test", 0, TimeUnit.MILLISECONDS);
        
    Thread.currentThread().sleep(5000);
    assertTrue(context.getRouteStatus("test") == ServiceStatus.Started);
    template.sendBody("direct:start", "Ready or not, Here, I come");

    context.getComponent("quartz", QuartzComponent.class).stop();
    success.assertIsSatisfied();
}

CronScheduledRoutePolicy

In order to use a CronScheduledRoutePolicy it is necessary to instantiate an object of the type org.apache.camel.routepolicy.quartz.CronScheduledRoutePolicy.  The following property values must be set on it to be useful
  • For Starting routes on a Schedule
    • routeStartTime - the initial scheduled Date and time as a Cron Expression for route start
  • For Stopping routes on a Schedule
    • routeStopTime - the initial scheduled Date and time as a Cron Expression for route stop
    • routeStopGracePeriod - the time period to wait before initiating graceful route stop (set to 10 seconds by default)
    • routeStopTimeUnit - the time unit for the grace period expressed as java.util.concurrent.TimeUnit (default value is TimeUnit.MILLISECONDS)
  • For Suspending routes on a Schedule
    • routeSuspendTime - the initial scheduled Date and time as a Cron Expression for route suspension
  • For Resuming routes on a Schedule
    • routeResumeTime - the initial scheduled Date and time as a Cron Expression for route resumption
Given below are examples using a CronScheduledRoutePolicy

@Test
public void testScheduledStartRoutePolicy() throws Exception {

    MockEndpoint success = (MockEndpoint) context.getEndpoint("mock:success");        
        
    success.expectedMessageCount(1);
        
    context.getComponent("quartz", QuartzComponent.class).setPropertiesFile("org/apache/camel/routepolicy/quartz/myquartz.properties");
    context.getComponent("quartz", QuartzComponent.class).start();
    context.addRoutes(new RouteBuilder() {
        public void configure() {    
            CronScheduledRoutePolicy policy = new CronScheduledRoutePolicy();
            policy.setRouteStartTime("*/3 * * * * ?");
                
            from("direct:start")
                .routeId("test")
                .routePolicy(policy)
                .to("mock:success");
        }
    });
    context.start();
    context.stopRoute("test", 0, TimeUnit.MILLISECONDS);
        
    Thread.currentThread().sleep(4000);
    assertTrue(context.getRouteStatus("test") == ServiceStatus.Started);
    template.sendBody("direct:start", "Ready or not, Here, I come");

    context.getComponent("quartz", QuartzComponent.class).stop();
    success.assertIsSatisfied();
}

Complete Examples

SimpleScheduledRoutePolicyTest.java
CronScheduledRoutePolicyTest.java

Further Details

https://issues.apache.org/activemq/browse/CAMEL-2936

Friday, September 3, 2010

Overriding a Property value set in a properties file with a JVM System Property

As of Camel 2.5, a new capability now exists in the most excellent and quite useful Camel Properties component to override a property value at runtime using a JVM System property without the need to restart the application to pick up the change.
This may be accomplished by overriding a property value by creating or editing a JVM System property of the same name as the property it replaces.

An example of how this can be done is given below

PropertiesComponent pc = context.getComponent("properties", PropertiesComponent.class);
pc.setCache(false);
        
System.setProperty("cool.end", "mock:override");
System.setProperty("cool.result", "override");

context.addRoutes(new RouteBuilder() {
    @Override
    public void configure() throws Exception {
        from("direct:start").to("properties:cool.end");
        from("direct:foo").to("properties:mock:{{cool.result}}");
    }
});
context.start();

getMockEndpoint("mock:override").expectedMessageCount(2);

template.sendBody("direct:start", "Hello World");
template.sendBody("direct:foo", "Hello Foo");

System.clearProperty("cool.end");
System.clearProperty("cool.result");
        
assertMockEndpointsSatisfied();

Further Details
https://issues.apache.org/activemq/browse/CAMEL-2791