Fellow Camel riders,
If you are a resident of Wasington DC or plan on pitching your tents there in the last week of May, you might want to check out this event being held by FuseSource. There will be several technical training sessions on Apache Camel, Servicemix, CXF and ActiveMQ.
The event also features presentations by noted speakers such as Gregory Hohpe and James Strachan.
You might also want to attend Apache Camel training sessions by my fellow Camel Committers Jon Anstey and Claus Ibsen. My FuseSource colleague and Apache Karaf committer Adrian Trenaman will be conducting training sessions on Apache CXF and Apache Servicemix.
All in all it should be a cool event to meet (or is it integrate) with other Camel riders and get a small caravan going!!!
Wish I could join... Maybe sometime when the caravan rides through Dallas (Hint, Hint, nudge, nudge, say no more, say no more...)
Please find a link to the event below
http://fusesource.com/camelone2011
Open Source of Knowledge
Thursday, March 31, 2011
Wednesday, February 9, 2011
Applying multiple route policies on Camel Routes
Available as of Camel 2.7
I have recently added a enhancement in Camel 2.7 to apply multiple route policies concurrently on a Camel route.
It is often necessary to apply multiple policies such as throttling, scheduling, ACL's etc to routes concurrently based on the organizational integration requirements. In earlier versions of Camel the route policy was limited to one policy per route.
In the example below, the route testRoute has a startPolicy and throttlePolicy applied concurrently. Both policies are applied as necessary on the route
Complete Examples
RoutePoliciesTest.java
RoutePoliciesRefTest.java
SpringRoutePoliciesTest.xml
Further Details
https://issues.apache.org/jira/browse/CAMEL-3254
I have recently added a enhancement in Camel 2.7 to apply multiple route policies concurrently on a Camel route.
It is often necessary to apply multiple policies such as throttling, scheduling, ACL's etc to routes concurrently based on the organizational integration requirements. In earlier versions of Camel the route policy was limited to one policy per route.
In the example below, the route testRoute has a startPolicy and throttlePolicy applied concurrently. Both policies are applied as necessary on the route
Complete Examples
RoutePoliciesTest.java
RoutePoliciesRefTest.java
SpringRoutePoliciesTest.xml
Further Details
https://issues.apache.org/jira/browse/CAMEL-3254
Friday, January 14, 2011
Definitive book on Camel - "Camel in Action" released
My colleagues Claus Ibsen and Jon Anstey have released an excellent book on Apache Camel. I encourage anyone interested in learning about Camel to check out this book.
I have had the pleasure of working with Claus and Jon at FuseSource and Apache over the last several months and years and recommend this book for readers looking for a definitive guide on Camel written by authorities on the subject.
Please find the links to this book below
http://www.manning.com/ibsen/
http://camel.apache.org/2010/12/25/camel-in-action-is-done.html
I am sure readers and camel users at every level will benefit greatly through this wonderful body of work.
I have had the pleasure of working with Claus and Jon at FuseSource and Apache over the last several months and years and recommend this book for readers looking for a definitive guide on Camel written by authorities on the subject.
Please find the links to this book below
http://www.manning.com/ibsen/
http://camel.apache.org/2010/12/25/camel-in-action-is-done.html
I am sure readers and camel users at every level will benefit greatly through this wonderful body of work.
Tuesday, December 28, 2010
Developed a new Routebox component encapsulating camel contexts and routes as camel endpoints
A couple of weeks ago, I contributed an interesting and unique new Camel component called Routebox. Please find the details regarding the Routebox component below.
Before we proceed, A small note about Camel routes
Camel routes are crafted using Java DSL or Spring XML to link URI endpoints representing protocols, technologies and infrastructure components based on well established integration patterns and integration requirements. The routes themselves are discrete, standalone and isolated. Routes are hosted by a camel context which is responsible for lifecycle management and administrative control of hosted routes. Routes are in and of themselves not layered in any way and the same is true of camel contexts.
Now let us identify the need for a Routebox
In integration environments with complex and multi-faceted integration needs along with a wide array of endpoint technologies it is often necessary to craft an integration solution by creating a sense of layering among camel routes effectively organizing them into
Coarse grained routes - routes with Routebox endpoints that facade a collection of inner or lower level routes that are
Requests sent to Routebox endpoints on coarse grained routes can then delegate requests to inner fine grained routes to achieve a specific integration objective via a set of fine grained routes, collect the final inner result, and continue to progress to the next step along the coarse-grained route.
The idea behind a Routebox is to act as a blackbox for a collection of fine grained routes and direct payloads received from the Routebox endpoint to specific inner fine grained routes based on a user defined internal routing strategy or a map that matches headers in payloads to determine the inner routes to which payloads are directed.
This new component facilitates encapsulation and indirection of requests received via a routebox consumer endpoint or producer endpoint on a coarse grained camel route to a set of inner routes declared in an inner camel context.
Further Routebox details
A Routebox endpoint consists of a Routebox URI of the type
routebox:routebox-name?...
It is not necessary for producers do not need to know the inner route endpoint URI and they can simply invoke the Routebox URI endpoint.
The Routebox can be applied on a route as a route consumer or as a route producer.
Route Producer endpoints are of two types
The Routebox URI has a query parameter (sendToConsumer) whose value specifies the type of producer endpoint.
The Routebox itself is currently JVM bound and uses an internal protocol ((SEDA or Direct) to receive incoming requests. The choice of internal protocol is controlled by a query parameter (innerProtocol).
The support for SEDA provides automatic support for asynchronous requests to be sent to the routebox endpoint. Similarly support for the Direct protocol provides support for synchronous requests.
The inner routes in the routebox supports all manner of valid camel endpoints. The inner context is a managed entity controlled by the routebox endpoint. Starting or stopping the routebox endpoint automatically triggers the stoppage of any inner routes and removal of the associated inner camel context for routebox consumers and producers hosting inner contexts and routes.
The dispatching to inner routes is determined by a routebox designer supplied dispatch strategy (a user defined class that implements org.apache.camel.component.routebox.strategy.RouteboxDispatchStrategy). This gives complete control to the routebox designer to direct incoming payloads to different fine grained routes. Alternatively the routbox designer can supply a HashMap (HashMap) consisting of a header value(key) and endpointUri(value). The user can then use the exchange header to send keys that then dictate the fine grained route to which the payload is sent.
Using the Routebox
A Routebox may be created by supplying the Routebox URI with one of the 2 options given below via the Camel Registry.
Routebox Example using a List of Routebuilders
Step 1: Create a Registry containing a list of routebuilders and a dispatch strategy. Then add the registry to the Camel Context
Step 2: Create a new routebox route to be launched in the camel context. Note that the # entries in the routeboxUri are matched to the created inner registry, routebuilder list and dispatchStrategy in the CamelContext Registry. Note that all routebuilders and associated routes are launched in the routebox created inner context
Complete Examples
RouteboxDefaultContextAndRouteBuilderTest.java
RouteboxDirectProducerOnlyTest.java
RouteboxDirectTest.java
RouteboxDispatchMapTest.java
RouteboxSedaTest.java
SimpleRouteBuilder.java
RouteboxDemoTestSupport.java
BookCatalog.java
Book.java
Further Details
https://issues.apache.org/jira/browse/CAMEL-3285
Before we proceed, A small note about Camel routes
Camel routes are crafted using Java DSL or Spring XML to link URI endpoints representing protocols, technologies and infrastructure components based on well established integration patterns and integration requirements. The routes themselves are discrete, standalone and isolated. Routes are hosted by a camel context which is responsible for lifecycle management and administrative control of hosted routes. Routes are in and of themselves not layered in any way and the same is true of camel contexts.
Now let us identify the need for a Routebox
In integration environments with complex and multi-faceted integration needs along with a wide array of endpoint technologies it is often necessary to craft an integration solution by creating a sense of layering among camel routes effectively organizing them into
Coarse grained routes - routes with Routebox endpoints that facade a collection of inner or lower level routes that are
- Application specific --> HR routes, Sales routes etc
- Objective specific --> Shipping routes, Fulfillment routes etc
- Technology specific --> Database routes, Caching routes, polling routes etc
Requests sent to Routebox endpoints on coarse grained routes can then delegate requests to inner fine grained routes to achieve a specific integration objective via a set of fine grained routes, collect the final inner result, and continue to progress to the next step along the coarse-grained route.
The idea behind a Routebox is to act as a blackbox for a collection of fine grained routes and direct payloads received from the Routebox endpoint to specific inner fine grained routes based on a user defined internal routing strategy or a map that matches headers in payloads to determine the inner routes to which payloads are directed.
This new component facilitates encapsulation and indirection of requests received via a routebox consumer endpoint or producer endpoint on a coarse grained camel route to a set of inner routes declared in an inner camel context.
Further Routebox details
A Routebox endpoint consists of a Routebox URI of the type
It is not necessary for producers do not need to know the inner route endpoint URI and they can simply invoke the Routebox URI endpoint.
The Routebox can be applied on a route as a route consumer or as a route producer.
Route Producer endpoints are of two types
- Endpoints that direct payloads to a routebox consumer
- Endpoints that host an inner camel context and direct payloads to inner routes hosted by the camel context
The Routebox URI has a query parameter (sendToConsumer) whose value specifies the type of producer endpoint.
The Routebox itself is currently JVM bound and uses an internal protocol ((SEDA or Direct) to receive incoming requests. The choice of internal protocol is controlled by a query parameter (innerProtocol).
The support for SEDA provides automatic support for asynchronous requests to be sent to the routebox endpoint. Similarly support for the Direct protocol provides support for synchronous requests.
The inner routes in the routebox supports all manner of valid camel endpoints. The inner context is a managed entity controlled by the routebox endpoint. Starting or stopping the routebox endpoint automatically triggers the stoppage of any inner routes and removal of the associated inner camel context for routebox consumers and producers hosting inner contexts and routes.
The dispatching to inner routes is determined by a routebox designer supplied dispatch strategy (a user defined class that implements org.apache.camel.component.routebox.strategy.RouteboxDispatchStrategy). This gives complete control to the routebox designer to direct incoming payloads to different fine grained routes. Alternatively the routbox designer can supply a HashMap (HashMap
Using the Routebox
A Routebox may be created by supplying the Routebox URI with one of the 2 options given below via the Camel Registry.
- A custom camel context
- A List of Routebuilders to be launched in a camel context automatically created by the Routebox
Routebox Example using a List of Routebuilders
Step 1: Create a Registry containing a list of routebuilders and a dispatch strategy. Then add the registry to the Camel Context
protected Context createJndiContext() throws Exception { Properties properties = new Properties(); // jndi.properties is optional InputStream in = getClass().getClassLoader().getResourceAsStream("jndi.properties"); if (in != null) { log.debug("Using jndi.properties from classpath root"); properties.load(in); } else { // set the default initial factory properties.put("java.naming.factory.initial", "org.apache.camel.util.jndi.CamelInitialContextFactory"); } return new InitialContext(new Hashtable(properties)); } protected JndiRegistry createRegistry() throws Exception { JndiRegistry registry = new JndiRegistry(createJndiContext()); // Wire the routeDefinitions & dispatchStrategy to the outer camelContext // where the routebox is declared Listroutes = new ArrayList (); routes.add(new SimpleRouteBuilder()); registry.bind("registry", createInnerRegistry()); registry.bind("routes", routes); registry.bind("strategy", new SimpleRouteDispatchStrategy()); return registry; } private JndiRegistry createInnerRegistry() throws Exception { JndiRegistry innerRegistry = new JndiRegistry(createJndiContext()); BookCatalog catalogBean = new BookCatalog(); innerRegistry.bind("library", catalogBean); return innerRegistry; } protected CamelContext createCamelContext() throws Exception { return new DefaultCamelContext(createRegistry()); }
Step 2: Create a new routebox route to be launched in the camel context. Note that the # entries in the routeboxUri are matched to the created inner registry, routebuilder list and dispatchStrategy in the CamelContext Registry. Note that all routebuilders and associated routes are launched in the routebox created inner context
private String routeboxUri = "routebox:multipleRoutes?innerRegistry=#registry&routeBuilders=#routes&dispatchStrategy=#strategy"; public void testRouteboxDirectAsyncRequests() throws Exception { CamelContext context = createCamelContext(); template = new DefaultProducerTemplate(context); template.start(); context.addRoutes(new RouteBuilder() { public void configure() { from(routeboxUri) .to("log:Routes operation performed?showAll=true"); } }); context.start(); // Now use the ProducerTemplate to send the request to the routebox template.requestBodyAndHeader(routeboxUri, book, "ROUTE_DISPATCH_KEY", operation); }
Complete Examples
RouteboxDefaultContextAndRouteBuilderTest.java
RouteboxDirectProducerOnlyTest.java
RouteboxDirectTest.java
RouteboxDispatchMapTest.java
RouteboxSedaTest.java
SimpleRouteBuilder.java
RouteboxDemoTestSupport.java
BookCatalog.java
Book.java
Further Details
https://issues.apache.org/jira/browse/CAMEL-3285
Tuesday, October 26, 2010
Weighted Round-Robin and Random Load balancing further simplified in Camel
I have further simplified the weighted round-robin and random load balancing support in Camel 2.6 based on comments and feedback from Claus Ibsen (http://davsclaus.blogspot.com/).
The changes are primarily in 2 areas
1> In Camel 2.5, in the case where the distributionRatio did not match the number
of specified load balancing processors, I had opted to use best effort load balancing
at runtime along with log warnings. In Camel 2.6, this behavior has now been
changed to throw an exception during route startup to indicate that the
distributionRatio does not match the number of processors.
2> In Camel 2.5 , the distributionRatio was passed to DSL or Spring XML as a List
as shown in
a> http://opensourceknowledge.blogspot.com/2010/10/added-support-for-weighted-round-robin.html
b> Camel Documentation (http://camel.apache.org/load-balancer.html).
I have modified this in Camel 2.6 based on Claus' recommendation and made
it easier for the user by changing the distribution ratio to a String that expects
an input of integer weights separated by a delimiter. The delimiter can also be
influenced using distributionRatioDelimiter String (the default delimiter being ",").
Given below are examples of how this simplifies the DSL for Camel 2.6
For Spring XML examples, please check out the following links
weightedRoundRobinLoadBalance.xml
weightedRandomLoadBalance.xml
The changes are primarily in 2 areas
of specified load balancing processors, I had opted to use best effort load balancing
at runtime along with log warnings. In Camel 2.6, this behavior has now been
changed to throw an exception during route startup to indicate that the
distributionRatio does not match the number of processors.
a> http://opensourceknowledge.blogspot.com/2010/10/added-support-for-weighted-round-robin.html
b> Camel Documentation (http://camel.apache.org/load-balancer.html).
I have modified this in Camel 2.6 based on Claus' recommendation and made
an input of integer weights separated by a delimiter. The delimiter can also be
influenced using distributionRatioDelimiter String (the default delimiter being ",").
Given below are examples of how this simplifies the DSL for Camel 2.6
public void testRoundRobin() throws Exception { x.expectedMessageCount(5); y.expectedMessageCount(2); z.expectedMessageCount(1); context.addRoutes(new RouteBuilder() { public void configure() { // START SNIPPET: example from("direct:start").loadBalance(). weighted(true, "4,2,1").to("mock:x", "mock:y", "mock:z"); // END SNIPPET: example } }); context.start(); sendMessages(1, 2, 3, 4, 5, 6, 7, 8); assertMockEndpointsSatisfied(); x.expectedBodiesReceived(1, 4, 6, 7, 8); y.expectedBodiesReceived(2, 5); z.expectedBodiesReceived(3); }
For Spring XML examples, please check out the following links
weightedRoundRobinLoadBalance.xml
weightedRandomLoadBalance.xml
Thursday, October 14, 2010
Added support for Weighted Round-Robin and Weighted Random Load balancing in Camel
I have just added a new capability in Camel 2.5, to offer Weighted Round-Robin and Weighted Random Load Balancer support.
Camel currently offers several different Load Balancing policies out of the box such as
The weighted load balancing policy allows you to specify a processing load distribution ratio for each server with respect to others. You can specify this as a positive processing weight for each server. A larger number indicates that the server can handle a larger load. The weight is utilized to determine the payload distribution ratio to different processing endpoints with respect to others.
In addition to the weight, endpoint selection is then further refined based on an algorithm (round-robin/random).
Weighted Round Robin Load Balancing
Given below is a Camel example of the weighted round-robin capability in action. Note that there are 3 mock camel endpoints (mock:x, mock:y and mock:z). The important thing is to associate a distribution ratio (list of server weights) and wire it into a load balancer on the route along with a boolean value (specifying whether the distribution algorithm is round-robin (true in this case).
In the example below we expect endpoints mock:x, mock:y and mock:z to receive 4, 2 and 1 messages out of every 7 messages respectively. In addition we expect each endpoint to receive messages in strict round-robin order until its weight no longer permits further delivery of messages until the next cycle.
Weighted Random Load Balancing
Given below is a Camel example of the weighted round-robin capability in action. Note that there are 3 mock camel endpoints (mock:x, mock:y and mock:z). Again in this case, the important thing is to associate a distribution ratio (list of server weights) and wire it into a load balancer on the route along with a boolean value (specifying whether the distribution algorithm is round-robin (false in this case).
In the example below we expect endpoints mock:x, mock:y and mock:z to receive 4, 2 and 1 messages out of every 7 messages respectively. In addition we expect each endpoint to receive messages in a random order until its weight no longer permits further delivery of messages until the next cycle.
Complete Examples
WeightedRoundRobinLoadBalanceTest
WeightedRandomLoadBalanceTest
Further Details
https://issues.apache.org/activemq/browse/CAMEL-3197
Camel currently offers several different Load Balancing policies out of the box such as
- Round-Robin
- Random
- Sticky
- Topic Based and
- Fail-over
The weighted load balancing policy allows you to specify a processing load distribution ratio for each server with respect to others. You can specify this as a positive processing weight for each server. A larger number indicates that the server can handle a larger load. The weight is utilized to determine the payload distribution ratio to different processing endpoints with respect to others.
In addition to the weight, endpoint selection is then further refined based on an algorithm (round-robin/random).
Weighted Round Robin Load Balancing
Given below is a Camel example of the weighted round-robin capability in action. Note that there are 3 mock camel endpoints (mock:x, mock:y and mock:z). The important thing is to associate a distribution ratio (list of server weights) and wire it into a load balancer on the route along with a boolean value (specifying whether the distribution algorithm is round-robin (true in this case).
In the example below we expect endpoints mock:x, mock:y and mock:z to receive 4, 2 and 1 messages out of every 7 messages respectively. In addition we expect each endpoint to receive messages in strict round-robin order until its weight no longer permits further delivery of messages until the next cycle.
public void testRoundRobin() throws Exception { x.expectedMessageCount(5); y.expectedMessageCount(2); z.expectedMessageCount(1); context.addRoutes(new RouteBuilder() { public void configure() { ArrayListdistributionRatio = new ArrayList (); distributionRatio.add(4); distributionRatio.add(2); distributionRatio.add(1); // START SNIPPET: example from("direct:start").loadBalance(). weighted(true, distributionRatio).to("mock:x", "mock:y", "mock:z"); // END SNIPPET: example } }); context.start(); sendMessages(1, 2, 3, 4, 5, 6, 7, 8); assertMockEndpointsSatisfied(); x.expectedBodiesReceived(1, 4, 6, 7, 8); y.expectedBodiesReceived(2, 5); z.expectedBodiesReceived(3); }
Weighted Random Load Balancing
Given below is a Camel example of the weighted round-robin capability in action. Note that there are 3 mock camel endpoints (mock:x, mock:y and mock:z). Again in this case, the important thing is to associate a distribution ratio (list of server weights) and wire it into a load balancer on the route along with a boolean value (specifying whether the distribution algorithm is round-robin (false in this case).
In the example below we expect endpoints mock:x, mock:y and mock:z to receive 4, 2 and 1 messages out of every 7 messages respectively. In addition we expect each endpoint to receive messages in a random order until its weight no longer permits further delivery of messages until the next cycle.
public void testRandom() throws Exception { x.expectedMessageCount(4); y.expectedMessageCount(2); z.expectedMessageCount(1); context.addRoutes(new RouteBuilder() { public void configure() { ArrayListdistributionRatio = new ArrayList (); distributionRatio.add(4); distributionRatio.add(2); distributionRatio.add(1); // START SNIPPET: example from("direct:start").loadBalance(). weighted(false, distributionRatio).to("mock:x", "mock:y", "mock:z"); // END SNIPPET: example } }); context.start(); sendMessages(1, 2, 3, 4, 5, 6, 7); assertMockEndpointsSatisfied(); }
Complete Examples
WeightedRoundRobinLoadBalanceTest
WeightedRandomLoadBalanceTest
Further Details
https://issues.apache.org/activemq/browse/CAMEL-3197
Tuesday, September 28, 2010
Developed a capability in the Camel Quartz Component to schedule route activation, de-activation, suspension and resumption
I have just added a feature in Camel-quartz (Camel version 2.5) to facilitate schedule based route activation, de-activation, suspension and resumption.
In enterprise environments, it is frequently necessary to schedule routes to run at certain times during the day or night (for e.g. Routes tied to Batch jobs...).
Scheduling of routes typically involves the following capabilities
The enforcement of the schedule is done via a ScheduledRoutePolicy that must be wired into the route. The ScheduledRoutePolicy currently supports 2 variants
In order to use a SimpleScheduledRoutePolicy it is necessary to instantiate an object of the type org.apache.camel.routepolicy.quartz.SimpleScheduledRoutePolicy. The following property values must be set on it to be useful
CronScheduledRoutePolicy
In order to use a CronScheduledRoutePolicy it is necessary to instantiate an object of the type org.apache.camel.routepolicy.quartz.CronScheduledRoutePolicy. The following property values must be set on it to be useful
Complete Examples
SimpleScheduledRoutePolicyTest.java
CronScheduledRoutePolicyTest.java
Further Details
https://issues.apache.org/activemq/browse/CAMEL-2936
In enterprise environments, it is frequently necessary to schedule routes to run at certain times during the day or night (for e.g. Routes tied to Batch jobs...).
Scheduling of routes typically involves the following capabilities
- Route activation - Starting a route a given start time if the route is in a stopped state awaiting activation.
- Route de-activation - Shutting down an otherwise active and started route at a given time.
- Route suspension - Simply de-activating the route consumer endpoint URI declared on the from(...) section of the route from listening on a given port. The route is still considered as started, however, clients will not be able to send requests along the route.
- Route resumption - Resuming the listener on a formerly suspended route consumer endpoint URI. This route is ready to accept requests following route resumption and client requests will be accepted by the route consumer to be forwarded along the route.
The enforcement of the schedule is done via a ScheduledRoutePolicy that must be wired into the route. The ScheduledRoutePolicy currently supports 2 variants
- SimpleScheduledRoutePolicy: Where the rules for route activation, de-activation, suspension and resumption are provided using dates, repeat counts and repeat intervals.
- CronScheduledRoutePolicy: Where the rules for route activation, de-activation, suspension and resumption are provided using Cron Expressions.
In order to use a SimpleScheduledRoutePolicy it is necessary to instantiate an object of the type org.apache.camel.routepolicy.quartz.SimpleScheduledRoutePolicy. The following property values must be set on it to be useful
- For Starting routes on a Schedule
- routeStartDate - the initial scheduled Date and time for route start
- routeStartRepeatCount - no of times to repeat the job
- routeStartRepeatInterval - the time interval in milliseconds to trigger the next attempt to start the route
- For Stopping routes on a Schedule
- routeStopDate - the initial scheduled Date and time for route stop
- routeStopRepeatCount - no of times to repeat the job
- routeStopRepeatInterval - the time interval in milliseconds to trigger the next attempt to stop the route
- routeStopGracePeriod - the time period to wait before initiating graceful route stop (set to 10 seconds by default)
- routeStopTimeUnit - the time unit for the grace period expressed as java.util.concurrent.TimeUnit (default value is TimeUnit.MILLISECONDS)
- For Suspending routes on a Schedule
- routeSuspendDate - the initial scheduled Date and time for route suspension
- routeSuspendRepeatCount - no of times to repeat the job
- routeSuspendRepeatInterval - the time interval in milliseconds to trigger the next attempt to start the route
- For Resuming routes on a Schedule
- routeResumeDate - the initial scheduled Date and time for route suspension
- routeResumeRepeatCount - no of times to repeat the job
- routeResumeRepeatInterval - the time interval in milliseconds to trigger the next attempt to start the route
@Test public void testScheduledStartRoutePolicy() throws Exception { MockEndpoint success = (MockEndpoint) context.getEndpoint("mock:success"); success.expectedMessageCount(1); context.getComponent("quartz", QuartzComponent.class).setPropertiesFile("org/apache/camel/routepolicy/quartz/myquartz.properties"); context.getComponent("quartz", QuartzComponent.class).start(); context.addRoutes(new RouteBuilder() { public void configure() { SimpleScheduledRoutePolicy policy = new SimpleScheduledRoutePolicy(); long startTime = System.currentTimeMillis() + 3000L; policy.setRouteStartDate(new Date(startTime)); policy.setRouteStartRepeatCount(1); policy.setRouteStartRepeatInterval(3000); from("direct:start") .routeId("test") .routePolicy(policy) .to("mock:success"); } }); context.start(); context.stopRoute("test", 0, TimeUnit.MILLISECONDS); Thread.currentThread().sleep(5000); assertTrue(context.getRouteStatus("test") == ServiceStatus.Started); template.sendBody("direct:start", "Ready or not, Here, I come"); context.getComponent("quartz", QuartzComponent.class).stop(); success.assertIsSatisfied(); }
CronScheduledRoutePolicy
In order to use a CronScheduledRoutePolicy it is necessary to instantiate an object of the type org.apache.camel.routepolicy.quartz.CronScheduledRoutePolicy. The following property values must be set on it to be useful
- For Starting routes on a Schedule
- routeStartTime - the initial scheduled Date and time as a Cron Expression for route start
- For Stopping routes on a Schedule
- routeStopTime - the initial scheduled Date and time as a Cron Expression for route stop
- routeStopGracePeriod - the time period to wait before initiating graceful route stop (set to 10 seconds by default)
- routeStopTimeUnit - the time unit for the grace period expressed as java.util.concurrent.TimeUnit (default value is TimeUnit.MILLISECONDS)
- For Suspending routes on a Schedule
- routeSuspendTime - the initial scheduled Date and time as a Cron Expression for route suspension
- For Resuming routes on a Schedule
- routeResumeTime - the initial scheduled Date and time as a Cron Expression for route resumption
@Test public void testScheduledStartRoutePolicy() throws Exception { MockEndpoint success = (MockEndpoint) context.getEndpoint("mock:success"); success.expectedMessageCount(1); context.getComponent("quartz", QuartzComponent.class).setPropertiesFile("org/apache/camel/routepolicy/quartz/myquartz.properties"); context.getComponent("quartz", QuartzComponent.class).start(); context.addRoutes(new RouteBuilder() { public void configure() { CronScheduledRoutePolicy policy = new CronScheduledRoutePolicy(); policy.setRouteStartTime("*/3 * * * * ?"); from("direct:start") .routeId("test") .routePolicy(policy) .to("mock:success"); } }); context.start(); context.stopRoute("test", 0, TimeUnit.MILLISECONDS); Thread.currentThread().sleep(4000); assertTrue(context.getRouteStatus("test") == ServiceStatus.Started); template.sendBody("direct:start", "Ready or not, Here, I come"); context.getComponent("quartz", QuartzComponent.class).stop(); success.assertIsSatisfied(); }
Complete Examples
SimpleScheduledRoutePolicyTest.java
CronScheduledRoutePolicyTest.java
Further Details
https://issues.apache.org/activemq/browse/CAMEL-2936
Friday, September 3, 2010
Overriding a Property value set in a properties file with a JVM System Property
As of Camel 2.5, a new capability now exists in the most excellent and quite useful Camel Properties component to override a property value at runtime using a JVM System property without the need to restart the application to pick up the change.
This may be accomplished by overriding a property value by creating or editing a JVM System property of the same name as the property it replaces.
An example of how this can be done is given below
Further Details
https://issues.apache.org/activemq/browse/CAMEL-2791
This may be accomplished by overriding a property value by creating or editing a JVM System property of the same name as the property it replaces.
An example of how this can be done is given below
PropertiesComponent pc = context.getComponent("properties", PropertiesComponent.class); pc.setCache(false); System.setProperty("cool.end", "mock:override"); System.setProperty("cool.result", "override"); context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from("direct:start").to("properties:cool.end"); from("direct:foo").to("properties:mock:{{cool.result}}"); } }); context.start(); getMockEndpoint("mock:override").expectedMessageCount(2); template.sendBody("direct:start", "Hello World"); template.sendBody("direct:foo", "Hello Foo"); System.clearProperty("cool.end"); System.clearProperty("cool.result"); assertMockEndpointsSatisfied();
Further Details
https://issues.apache.org/activemq/browse/CAMEL-2791
Monday, August 23, 2010
A Camel SIP component for Publishing/Subscribing to a SIP Presence Agent on Camel routes
I have committed a Camel SIP component (based on the Jain SIP Implementation) which allows for simple wiring of SIP based endpoints on a camel route that are capable of communication (publish/subscribe) with a SIP Presence Agent.
A SIP Presence Agent is a software entity that facilitates sending of event notifications to event subscribers and receiving of event publications from an event publisher.
The SIP component supports the creation of Producer (Event Publisher) and Consumer (Event Subscriber) endpoints which can
Configuring the SIP Endpoint URI
The SIP protocol requires a number of headers to be sent along with the request in order to provide information about the sender, intermediaries, circuit details, call sequence etc. These headers may be injected into the camel context registry to be then looked up by the endpoint URI for injection into the publish/subscribe activity by the SIP Producer or Consumer.
The Mandatory Headers included in any SIP call are
In addition to headers the SIP URI may be further configured with the following details
Given these possible configuration values a SipEndpoint URI may be configured in the following way
Creating a Camel SIP Publisher
In the example below, a SIP Publisher is created to send SIP Event publications to
Creating a Camel SIP Subscriber
In the example below, a SIP Subscriber is created to receive SIP Event publications sent to
The Camel SIP component also ships with a Presence Agent that is meant to be used for Testing and Demo purposes only. An example of instantiating a Presence Agent is given above. Note that the Presence Agent is set up as a user agent@localhost:5152 and is capable of communicating with both Publisher as well as Subscriber. It has a separate SIP stackName distinct from Publisher as well as Subscriber. While it is set up as a Camel Consumer, it does not actually send any messages along the route to the endpoint "mock:neverland".
Complete Examples
PublishSubscribeTest
Further Details
https://issues.apache.org/activemq/browse/CAMEL-2943
A SIP Presence Agent is a software entity that facilitates sending of event notifications to event subscribers and receiving of event publications from an event publisher.
The SIP component supports the creation of Producer (Event Publisher) and Consumer (Event Subscriber) endpoints which can
- register with a Presence Agent
- In case of producers
- publish events and notifications to the Presence Agent
- In case of consumers,
- subscribe to the presence agent expressing interest in a specific event
- receive notifications from the presence agent at periodic intervals following state changes.
Configuring the SIP Endpoint URI
The SIP protocol requires a number of headers to be sent along with the request in order to provide information about the sender, intermediaries, circuit details, call sequence etc. These headers may be injected into the camel context registry to be then looked up by the endpoint URI for injection into the publish/subscribe activity by the SIP Producer or Consumer.
The Mandatory Headers included in any SIP call are
- FromHeader - Mandatory Header containing details regarding the message originator.
- ViaHeader(s) - Header tracking the entities which received and forwarded the message to the next hop
- ContentTypeHeader - Header containing information regarding the encoding of the Payload. SIP is also capable of sending audio and many different binary and encoded formats.
- CallIdHeader - Header containing the conversation id which is updated each time a producer or consumer participate in a conversation (i.e. use the same dialog for an interaction).
- MaxForwardsHeader - Header specifying the maximum number of entities that may be used as 'Via' entities during communication
- EventHeader - Used by our Producer (Event Publisher) and Consumer (Event Subscriber) to provide the Presence Agent with details about the Event that they are interested in.
- ContactHeader - Header containing human readable contact details & information about the message sender
- ExpiresHeader - Header containing information regarding message expiry. Expired messages are discarded by the Consumer (Event Subscriber).
- ExtensionHeader - A Header available for adding any custom information to be sent to the message receiver.
In addition to headers the SIP URI may be further configured with the following details
- stackName - Must be a unique value for each endpoint. Name of the SIP Listener listening on a given host/port
- transport - Communication protocol. Must be TCP or UDP
- maxMessageSize - Maximum size for a given Message
- cacheConnections - Whether connections are cached by the SIP Stack
- automaticDialog - Whether every communication is done through an auto-created Dialog.
- contentType - the MIME Type for the payload sent on the wire
- receiveTimeout - The amount of time between sending a message and receiving an acknowledgement from the Presence Agent or Consumer (Event Subscriber). If there is no acknowledgement, retries may be attempted or the client may abort or cancel further retries.
- msgExpiration - The message expiration value that is matched against the incoming value in the ExpiresHeader . Used to check whether a message is considered as expired.
Given these possible configuration values a SipEndpoint URI may be configured in the following way
// Simple Instantiation from("sip://johndoe@localhost:5154" + "?stackName=Subscriber" + "&toUser=agent" + "&toHost=localhost" + "&toPort=5152" + "&eventHeaderName=evtHdrName" + "&eventId=evtid"); //or using Camel Context Registry based Headers from("sips://johndoe@localhost:5154" + "?stackName=Subscriber2" + "&fromHeader=#fromDetails" + "&toHeader=#toDetails" + "&viaHeader=#viaDetails" + "&contentTypeHeader=#contentTypeDetails" + "&callIdHeader =#callIdDetails" + "&maxForwardsHeader =#maxForwardsDetails" + "&eventHeader=#eventDetails" + "&contactHeader=#contactDetails" + "&expiresHeader=#expiresDetails" + "&extensionHeader=#extensionDetails" + "&eventId=evtid");
Creating a Camel SIP Publisher
In the example below, a SIP Publisher is created to send SIP Event publications to
- a user "agent@localhost:5152". This is the address of the SIP Presence Agent which acts as a broker between the SIP Publisher and Subscriber
- using a SIP Stack named client
- using a registry based eventHeader called evtHdrName
- using a registry based eventId called evtId
- from a SIP Stack with Listener set up as user2@localhost:3534
- The Event being published is EVENT_A
- A Mandatory Header called REQUEST_METHOD is set to Request.Publish thereby setting up the endpoint as a Event pubisher"
producerTemplate.sendBodyAndHeader( "sip://agent@localhost:5152?stackName=client&eventHeaderName=evtHdrName&eventId=evtid&fromUser=user2&fromHost=localhost&fromPort=3534", "EVENT_A", "REQUEST_METHOD", Request.PUBLISH);
Creating a Camel SIP Subscriber
In the example below, a SIP Subscriber is created to receive SIP Event publications sent to
- a user "johndoe@localhost:5154"
- using a SIP Stack named Subscriber
- registering with a Presence Agent user called agent@localhost:5152
- using a registry based eventHeader called evtHdrName. The evtHdrName contains the Event which is se to "Event_A"
- using a registry based eventId called evtId
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { // Create PresenceAgent from("sip://agent@localhost:5152?stackName=PresenceAgent&presenceAgent=true&eventHeaderName=evtHdrName&eventId=evtid") .to("mock:neverland"); // Create Sip Consumer(Event Subscriber) from("sip://johndoe@localhost:5154?stackName=Subscriber&toUser=agent&toHost=localhost&toPort=5152&eventHeaderName=evtHdrName&eventId=evtid") .to("log:ReceivedEvent?level=DEBUG") .to("mock:notification"); } }; }
The Camel SIP component also ships with a Presence Agent that is meant to be used for Testing and Demo purposes only. An example of instantiating a Presence Agent is given above. Note that the Presence Agent is set up as a user agent@localhost:5152 and is capable of communicating with both Publisher as well as Subscriber. It has a separate SIP stackName distinct from Publisher as well as Subscriber. While it is set up as a Camel Consumer, it does not actually send any messages along the route to the endpoint "mock:neverland".
Complete Examples
PublishSubscribeTest
Further Details
https://issues.apache.org/activemq/browse/CAMEL-2943
Thursday, August 19, 2010
Customizing Netty Endpoints using Custom Pipeline Factories
I have added an enhancement to the Netty component in Camel 2.5.0 that allows Netty Endpoints to be completely configurable by providing an ability to add a custom channel pipeline factories on both Producer and Consumer endpoints.
This capability facilitates the creation of custom channel pipelines. Custom channel pipelines provide complete control to the user over the handler/interceptor chain by inserting custom handler(s), encoder(s) & decoders without having to specify them in the Netty Endpoint URL in a very simple way.
Given below is a step by step procedure for incorporating custom channel pipeline factories into a Netty Endpoint
Step 1: Create the custom channel pipeline factory
An example of a client-side and server-side channel pipeline factories is given below. In this example below please note the following
Step 2: Register the custom channel pipeline factory with the Camel Context
In order to register the custom channel pipeline factory you need to do the following
Step 3: Instructing the Netty Endpoint to utilize the Camel Context registered channel pipeline factory
Netty Producers endpoints may now be configured to use the pipeline factories as follows.
Netty Consumer endoints may be configured as follows
Sample code
NettyCustomPipelineFactoryAsynchTest.java
NettyCustomPipelineFactorySynchTest.java
Further details
https://issues.apache.org/activemq/browse/CAMEL-2713
This capability facilitates the creation of custom channel pipelines. Custom channel pipelines provide complete control to the user over the handler/interceptor chain by inserting custom handler(s), encoder(s) & decoders without having to specify them in the Netty Endpoint URL in a very simple way.
Given below is a step by step procedure for incorporating custom channel pipeline factories into a Netty Endpoint
Step 1: Create the custom channel pipeline factory
An example of a client-side and server-side channel pipeline factories is given below. In this example below please note the following
- A Producer linked channel pipeline factory must extend the abstract class ClientPipelineFactory
- A Consumer linked channel pipeline factory must extend the abstract class ServerPipelineFactory
- The classes can optionally override the getPipeline() method in order to insert custom handler(s), encoder(s) and decoder(s)
public class SampleClientChannelPipelineFactory extends ClientPipelineFactory { private int maxLineSize = 1024; private boolean invoked; public ChannelPipeline getPipeline() throws Exception { invoked = true; ChannelPipeline channelPipeline = Channels.pipeline(); channelPipeline.addLast("decoder-DELIM", new DelimiterBasedFrameDecoder(maxLineSize, true, Delimiters.lineDelimiter())); channelPipeline.addLast("decoder-SD", new StringDecoder(CharsetUtil.UTF_8)); channelPipeline.addLast("encoder-SD", new StringEncoder(CharsetUtil.UTF_8)); channelPipeline.addLast("handler", new ClientChannelHandler(producer, exchange, callback)); return channelPipeline; } public boolean isfactoryInvoked() { return invoked; } }
public class SampleServerChannelPipelineFactory extends ServerPipelineFactory { private int maxLineSize = 1024; private boolean invoked; public ChannelPipeline getPipeline() throws Exception { invoked = true; ChannelPipeline channelPipeline = Channels.pipeline(); channelPipeline.addLast("encoder-SD", new StringEncoder(CharsetUtil.UTF_8)); channelPipeline.addLast("decoder-DELIM", new DelimiterBasedFrameDecoder(maxLineSize, true, Delimiters.lineDelimiter())); channelPipeline.addLast("decoder-SD", new StringDecoder(CharsetUtil.UTF_8)); channelPipeline.addLast("handler", new ServerChannelHandler(consumer)); return channelPipeline; } public boolean isfactoryInvoked() { return invoked; } }
Step 2: Register the custom channel pipeline factory with the Camel Context
In order to register the custom channel pipeline factory you need to do the following
protected void addPipelineFactoryToRegistry() throws Exception { Registry registry = camelContext.getRegistry(); clientPipelineFactory = new TestClientChannelPipelineFactory(); serverPipelineFactory = new TestServerChannelPipelineFactory(); registry.bind("cpf", clientPipelineFactory); registry.bind("spf", serverPipelineFactory); }
Step 3: Instructing the Netty Endpoint to utilize the Camel Context registered channel pipeline factory
Netty Producers endpoints may now be configured to use the pipeline factories as follows.
private void sendRequest() throws Exception { // Async request response = (String) producerTemplate.requestBody( "netty:tcp://localhost:5110?clientPipelineFactory=#cpf&textline=true", "Forest Gump describing Vietnam..."); }
Netty Consumer endoints may be configured as follows
protected Context createJndiContext() throws Exception { Properties properties = new Properties(); // jndi.properties is optional InputStream in = getClass().getClassLoader().getResourceAsStream("jndi.properties"); if (in != null) { log.debug("Using jndi.properties from classpath root"); properties.load(in); } else { properties.put("java.naming.factory.initial", "org.apache.camel.util.jndi.CamelInitialContextFactory"); } return new InitialContext(new Hashtable(properties)); } public void createCamelRoute() throws Exception { CamelContext context = new DefaultCamelContext(new JndiRegistry(createJndiContext()); context.addRoutes(new RouteBuilder() { public void configure() { from("netty:tcp://localhost:5110?serverPipelineFactory=#spf&textline=true") .process(new Processor() { public void process(Exchange exchange) throws Exception { exchange.getOut().setBody("Forrest Gump: We was always taking long walks, and we was always looking for a guy named 'Charlie'"); } }); } }); context.start(); }
Sample code
NettyCustomPipelineFactoryAsynchTest.java
NettyCustomPipelineFactorySynchTest.java
Further details
https://issues.apache.org/activemq/browse/CAMEL-2713
Subscribe to:
Posts (Atom)