Tuesday, December 28, 2010

Developed a new Routebox component encapsulating camel contexts and routes as camel endpoints

A couple of weeks ago, I contributed an interesting and unique new Camel component called Routebox. Please find the details regarding the Routebox component below.

Before we proceed, A small note about Camel routes

Camel routes are crafted using Java DSL or Spring XML to link URI endpoints representing protocols, technologies and infrastructure components based on well established integration patterns and integration requirements. The routes themselves are discrete, standalone and isolated. Routes are hosted by a camel context which is responsible for lifecycle management and administrative control of hosted routes. Routes are in and of themselves not layered in any way and the same is true of camel contexts.

Now let us identify the need for a Routebox

In integration environments with complex and multi-faceted integration needs along with a wide array of endpoint technologies it is often necessary to craft an integration solution by creating a sense of layering among camel routes effectively organizing them into


Coarse grained routes - routes with Routebox endpoints that facade a collection of inner or lower level routes that are
  • Application specific --> HR routes, Sales routes etc
  • Objective specific --> Shipping routes, Fulfillment routes etc
  • Technology specific --> Database routes, Caching routes, polling routes etc
Fine grained routes - routes that execute a specific business and/or integration pattern.

Requests sent to Routebox endpoints on coarse grained routes can then delegate requests to inner fine grained routes to achieve a specific integration objective via a set of fine grained routes, collect the final inner result, and continue to progress to the next step along the coarse-grained route.

The idea behind a Routebox is to act as a blackbox for a collection of fine grained routes and direct payloads received from the Routebox endpoint to specific inner fine grained routes based on a user defined internal routing strategy or a map that matches headers in payloads to determine the inner routes to which payloads are directed.

This new component facilitates encapsulation and indirection of requests received via a routebox consumer endpoint or producer endpoint on a coarse grained camel route to a set of inner routes declared in an inner camel context.

Further Routebox details

A Routebox endpoint consists of a Routebox URI of the type

routebox:routebox-name?...

It is not necessary for producers do not need to know the inner route endpoint URI and they can simply invoke the Routebox URI endpoint.

The Routebox can be applied on a route as a route consumer or as a route producer.

Route Producer endpoints are of two types
  • Endpoints that direct payloads to a routebox consumer
  • Endpoints that host an inner camel context and direct payloads to inner routes hosted by the camel context

The Routebox URI has a query parameter (sendToConsumer) whose value specifies the type of producer endpoint.

The Routebox itself is currently JVM bound and uses an internal protocol ((SEDA or Direct) to receive incoming requests. The choice of internal protocol is controlled by a query parameter (innerProtocol).

The support for SEDA provides automatic support for asynchronous requests to be sent to the routebox endpoint. Similarly support for the Direct protocol provides support for synchronous requests.

The inner routes in the routebox supports all manner of valid camel endpoints. The inner context is a managed entity controlled by the routebox endpoint. Starting or stopping the routebox endpoint automatically triggers the stoppage of any inner routes and removal of the associated inner camel context for routebox consumers and producers hosting inner contexts and routes.

The dispatching to inner routes is determined by a routebox designer supplied dispatch strategy (a user defined class that implements org.apache.camel.component.routebox.strategy.RouteboxDispatchStrategy). This gives complete control to the routebox designer to direct incoming payloads to different fine grained routes. Alternatively the routbox designer can supply a HashMap (HashMap) consisting of a header value(key) and endpointUri(value). The user can then use the exchange header to send keys that then dictate the fine grained route to which the payload is sent.

Using the Routebox

A Routebox may be created by supplying the Routebox URI with one of the 2 options given below via the Camel Registry.
  • A custom camel context 
  • A List of Routebuilders to be launched in a camel context automatically created by the Routebox

Routebox Example using a List of Routebuilders

Step 1: Create a Registry containing a list of routebuilders and a dispatch strategy. Then add the registry to the Camel Context

protected Context createJndiContext() throws Exception {
        Properties properties = new Properties();

        // jndi.properties is optional
        InputStream in = getClass().getClassLoader().getResourceAsStream("jndi.properties");
        if (in != null) {
            log.debug("Using jndi.properties from classpath root");
            properties.load(in);
        } else {
            // set the default initial factory
            properties.put("java.naming.factory.initial", "org.apache.camel.util.jndi.CamelInitialContextFactory");
        }
        return new InitialContext(new Hashtable(properties));
    }

    protected JndiRegistry createRegistry() throws Exception {
        JndiRegistry registry = new JndiRegistry(createJndiContext());
        
        // Wire the routeDefinitions & dispatchStrategy to the outer camelContext 
        // where the routebox is declared
        List routes = new ArrayList();
        routes.add(new SimpleRouteBuilder());
        registry.bind("registry", createInnerRegistry());
        registry.bind("routes", routes);
        registry.bind("strategy", new SimpleRouteDispatchStrategy());
        
        return registry;
    }

    private JndiRegistry createInnerRegistry() throws Exception {
        JndiRegistry innerRegistry = new JndiRegistry(createJndiContext());
        BookCatalog catalogBean = new BookCatalog();
        innerRegistry.bind("library", catalogBean);        
        
        return innerRegistry;
    }

    protected CamelContext createCamelContext() throws Exception {
        return new DefaultCamelContext(createRegistry());
    }

Step 2: Create a new routebox route to be launched in the camel context. Note that the # entries in the routeboxUri are matched to the created inner registry, routebuilder list and dispatchStrategy in the CamelContext Registry. Note that all routebuilders and associated routes are launched in the routebox created inner context

private String routeboxUri = "routebox:multipleRoutes?innerRegistry=#registry&routeBuilders=#routes&dispatchStrategy=#strategy";

    public void testRouteboxDirectAsyncRequests() throws Exception {
        CamelContext context = createCamelContext();
        template = new DefaultProducerTemplate(context);
        template.start();        
     
        context.addRoutes(new RouteBuilder() {
            public void configure() {
                from(routeboxUri)
                    .to("log:Routes operation performed?showAll=true");
            }
        });
        context.start();

        // Now use the ProducerTemplate to send the request to the routebox
        template.requestBodyAndHeader(routeboxUri, book, "ROUTE_DISPATCH_KEY", operation);
    }

Complete Examples
RouteboxDefaultContextAndRouteBuilderTest.java
RouteboxDirectProducerOnlyTest.java
RouteboxDirectTest.java
RouteboxDispatchMapTest.java
RouteboxSedaTest.java
SimpleRouteBuilder.java
RouteboxDemoTestSupport.java
BookCatalog.java
Book.java

Further Details
https://issues.apache.org/jira/browse/CAMEL-3285

Tuesday, October 26, 2010

Weighted Round-Robin and Random Load balancing further simplified in Camel

I have further simplified the weighted round-robin and random load balancing support in Camel 2.6 based on comments and feedback from Claus Ibsen (http://davsclaus.blogspot.com/).

The changes are primarily in 2 areas

1> In Camel 2.5, in the case where the distributionRatio did not match the number
     of specified load balancing processors, I had opted to use best effort load balancing
     at runtime along with log warnings. In Camel 2.6, this behavior has now been
     changed to throw an exception during route startup to indicate that the
     distributionRatio does not match the number of processors.

2> In Camel 2.5 , the distributionRatio was passed to DSL or Spring XML as a List 
     as shown in
         a> http://opensourceknowledge.blogspot.com/2010/10/added-support-for-weighted-round-robin.html
         b> Camel Documentation (http://camel.apache.org/load-balancer.html).
               I have modified this in Camel 2.6 based on Claus' recommendation and made 

               it easier for the user by changing the distribution ratio to a String that expects
               an input of integer weights separated by a delimiter. The delimiter can also be
               influenced using distributionRatioDelimiter String (the default delimiter being ",").

Given below are examples of how this simplifies the DSL for Camel 2.6

public void testRoundRobin() throws Exception {

        x.expectedMessageCount(5);
        y.expectedMessageCount(2);
        z.expectedMessageCount(1);

        context.addRoutes(new RouteBuilder() {
            public void configure() {
                
                // START SNIPPET: example
                from("direct:start").loadBalance().
                weighted(true, "4,2,1").to("mock:x", "mock:y", "mock:z");
                // END SNIPPET: example
            }
        });
        context.start();
        
        sendMessages(1, 2, 3, 4, 5, 6, 7, 8);
        
        assertMockEndpointsSatisfied();
        x.expectedBodiesReceived(1, 4, 6, 7, 8);
        y.expectedBodiesReceived(2, 5);
        z.expectedBodiesReceived(3);
    }

For Spring XML examples, please check out the following links

weightedRoundRobinLoadBalance.xml
weightedRandomLoadBalance.xml

Thursday, October 14, 2010

Added support for Weighted Round-Robin and Weighted Random Load balancing in Camel

I have just added a new capability in Camel 2.5, to offer Weighted Round-Robin and Weighted Random Load Balancer support.

Camel currently offers several different Load Balancing policies out of the box such as
  • Round-Robin
  • Random
  • Sticky
  • Topic Based and
  • Fail-over
However, in many enterprise environments where server nodes of unequal processing power & performance characteristics are utilized to host services and processing endpoints, it is frequently necessary to distribute processing load based on their individual server capabilities so that some endpoints are not unfairly burdened with requests. Obviously simple round-robin or random load balancing do not alleviate problems of this nature. A Weighted Round-Robin and/or Weighted Random load balancer is invaluable in this regard.

The weighted load balancing policy allows you to specify a processing load distribution ratio for each server with respect to others. You can specify this as a positive processing weight for each server. A larger number indicates that the server can handle a larger load. The weight is utilized to determine the payload distribution ratio to different processing endpoints with respect to others.

In addition to the weight, endpoint selection is then further refined based on an algorithm (round-robin/random).

Weighted Round Robin Load Balancing

Given below is a Camel example of the weighted round-robin capability in action. Note that there are 3 mock camel endpoints (mock:x, mock:y and mock:z). The important thing is to associate a distribution ratio (list of server weights) and wire it into a load balancer on the route  along with a boolean value (specifying whether the distribution algorithm is round-robin (true in this case).

In the example below we expect endpoints mock:x, mock:y and mock:z to receive 4, 2 and 1 messages  out of every 7 messages respectively. In addition we expect each endpoint to receive messages in strict round-robin order until its weight no longer permits further delivery of messages until the next cycle.

public void testRoundRobin() throws Exception {

        x.expectedMessageCount(5);
        y.expectedMessageCount(2);
        z.expectedMessageCount(1);

        context.addRoutes(new RouteBuilder() {
            public void configure() {
                ArrayList distributionRatio = new ArrayList();
                distributionRatio.add(4);
                distributionRatio.add(2);
                distributionRatio.add(1);
                
                // START SNIPPET: example
                from("direct:start").loadBalance().
                weighted(true, distributionRatio).to("mock:x", "mock:y", "mock:z");
                // END SNIPPET: example
            }
        });
        context.start();
        
        sendMessages(1, 2, 3, 4, 5, 6, 7, 8);
        
        assertMockEndpointsSatisfied();
        x.expectedBodiesReceived(1, 4, 6, 7, 8);
        y.expectedBodiesReceived(2, 5);
        z.expectedBodiesReceived(3);
    }

Weighted Random Load Balancing

Given below is a Camel example of the weighted round-robin capability in action. Note that there are 3 mock camel endpoints (mock:x, mock:y and mock:z). Again in this case, the important thing is to associate a distribution ratio (list of server weights) and wire it into a load balancer on the route  along with a boolean value (specifying whether the distribution algorithm is round-robin (false in this case).

In the example below we expect endpoints mock:x, mock:y and mock:z to receive 4, 2 and 1 messages out of every 7 messages respectively. In addition we expect each endpoint to receive messages in a random order until its weight no longer permits further delivery of messages until the next cycle.

public void testRandom() throws Exception {

        x.expectedMessageCount(4);
        y.expectedMessageCount(2);
        z.expectedMessageCount(1);

        context.addRoutes(new RouteBuilder() {
            public void configure() {
                ArrayList distributionRatio = new ArrayList();
                distributionRatio.add(4);
                distributionRatio.add(2);
                distributionRatio.add(1);
                
                // START SNIPPET: example
                from("direct:start").loadBalance().
                weighted(false, distributionRatio).to("mock:x", "mock:y", "mock:z");
                // END SNIPPET: example
            }
        });
        context.start();
        
        sendMessages(1, 2, 3, 4, 5, 6, 7);
        
        assertMockEndpointsSatisfied();
    }

Complete Examples
WeightedRoundRobinLoadBalanceTest
WeightedRandomLoadBalanceTest

Further Details
https://issues.apache.org/activemq/browse/CAMEL-3197

Tuesday, September 28, 2010

Developed a capability in the Camel Quartz Component to schedule route activation, de-activation, suspension and resumption

I have just added a feature in Camel-quartz (Camel version 2.5) to facilitate schedule based route activation, de-activation, suspension and resumption.

In enterprise environments, it is frequently necessary to schedule routes to run at certain times during the day or night (for e.g. Routes tied to Batch jobs...).

Scheduling of routes typically involves  the following capabilities

  • Route activation - Starting a route a given start time if the route is in a stopped state awaiting activation.
  • Route de-activation - Shutting down an otherwise active and started route at a given time. 
  • Route suspension - Simply de-activating the route consumer endpoint URI declared on the from(...) section of the route from listening on a given port. The route is still considered as started, however, clients will not be able to send requests along the route. 
  • Route resumption - Resuming the listener on a formerly suspended route consumer endpoint URI. This route is ready to accept requests following route resumption and client requests will be accepted by the route consumer to be forwarded along the route.

The enforcement of the schedule is done via a ScheduledRoutePolicy that must be wired into the route. The ScheduledRoutePolicy currently supports 2 variants
  1. SimpleScheduledRoutePolicy: Where the rules for route activation, de-activation, suspension and resumption are provided using dates, repeat counts and repeat intervals.
  2. CronScheduledRoutePolicy: Where the rules for route activation, de-activation, suspension and resumption are provided using Cron Expressions.
SimpleScheduledRoutePolicy

In order to use a SimpleScheduledRoutePolicy it is necessary to instantiate an object of the type org.apache.camel.routepolicy.quartz.SimpleScheduledRoutePolicy.  The following property values must be set on it to be useful
  • For Starting routes on a Schedule
    • routeStartDate - the initial scheduled Date and time for route start
    • routeStartRepeatCount - no of times to repeat the job
    • routeStartRepeatInterval - the time interval in milliseconds to trigger the next attempt to start the route
  • For Stopping routes on a Schedule
    • routeStopDate - the initial scheduled Date and time for route stop
    • routeStopRepeatCount - no of times to repeat the job
    • routeStopRepeatInterval - the time interval in milliseconds to trigger the next attempt to stop the route
    • routeStopGracePeriod - the time period to wait before initiating graceful route stop (set to 10 seconds by default)
    • routeStopTimeUnit - the time unit for the grace period expressed as java.util.concurrent.TimeUnit (default value is TimeUnit.MILLISECONDS)
  • For Suspending routes on a Schedule
    • routeSuspendDate - the initial scheduled Date and time for route suspension
    • routeSuspendRepeatCount - no of times to repeat the job
    • routeSuspendRepeatInterval - the time interval in milliseconds to trigger the next attempt to start the route
  • For Resuming routes on a Schedule
    • routeResumeDate - the initial scheduled Date and time for route suspension
    • routeResumeRepeatCount - no of times to repeat the job
    • routeResumeRepeatInterval - the time interval in milliseconds to trigger the next attempt to start the route
Given below are examples using a SimpleScheduledRoutePolicy

@Test
public void testScheduledStartRoutePolicy() throws Exception {
    MockEndpoint success = (MockEndpoint) context.getEndpoint("mock:success");        
        
    success.expectedMessageCount(1);
        
    context.getComponent("quartz", QuartzComponent.class).setPropertiesFile("org/apache/camel/routepolicy/quartz/myquartz.properties");
    context.getComponent("quartz", QuartzComponent.class).start();
    context.addRoutes(new RouteBuilder() {
        public void configure() {   
            SimpleScheduledRoutePolicy policy = new SimpleScheduledRoutePolicy();
            long startTime = System.currentTimeMillis() + 3000L;
            policy.setRouteStartDate(new Date(startTime));
            policy.setRouteStartRepeatCount(1);
            policy.setRouteStartRepeatInterval(3000);
                
            from("direct:start")
                .routeId("test")
                .routePolicy(policy)
                .to("mock:success");
        }
    });
    context.start();
    context.stopRoute("test", 0, TimeUnit.MILLISECONDS);
        
    Thread.currentThread().sleep(5000);
    assertTrue(context.getRouteStatus("test") == ServiceStatus.Started);
    template.sendBody("direct:start", "Ready or not, Here, I come");

    context.getComponent("quartz", QuartzComponent.class).stop();
    success.assertIsSatisfied();
}

CronScheduledRoutePolicy

In order to use a CronScheduledRoutePolicy it is necessary to instantiate an object of the type org.apache.camel.routepolicy.quartz.CronScheduledRoutePolicy.  The following property values must be set on it to be useful
  • For Starting routes on a Schedule
    • routeStartTime - the initial scheduled Date and time as a Cron Expression for route start
  • For Stopping routes on a Schedule
    • routeStopTime - the initial scheduled Date and time as a Cron Expression for route stop
    • routeStopGracePeriod - the time period to wait before initiating graceful route stop (set to 10 seconds by default)
    • routeStopTimeUnit - the time unit for the grace period expressed as java.util.concurrent.TimeUnit (default value is TimeUnit.MILLISECONDS)
  • For Suspending routes on a Schedule
    • routeSuspendTime - the initial scheduled Date and time as a Cron Expression for route suspension
  • For Resuming routes on a Schedule
    • routeResumeTime - the initial scheduled Date and time as a Cron Expression for route resumption
Given below are examples using a CronScheduledRoutePolicy

@Test
public void testScheduledStartRoutePolicy() throws Exception {

    MockEndpoint success = (MockEndpoint) context.getEndpoint("mock:success");        
        
    success.expectedMessageCount(1);
        
    context.getComponent("quartz", QuartzComponent.class).setPropertiesFile("org/apache/camel/routepolicy/quartz/myquartz.properties");
    context.getComponent("quartz", QuartzComponent.class).start();
    context.addRoutes(new RouteBuilder() {
        public void configure() {    
            CronScheduledRoutePolicy policy = new CronScheduledRoutePolicy();
            policy.setRouteStartTime("*/3 * * * * ?");
                
            from("direct:start")
                .routeId("test")
                .routePolicy(policy)
                .to("mock:success");
        }
    });
    context.start();
    context.stopRoute("test", 0, TimeUnit.MILLISECONDS);
        
    Thread.currentThread().sleep(4000);
    assertTrue(context.getRouteStatus("test") == ServiceStatus.Started);
    template.sendBody("direct:start", "Ready or not, Here, I come");

    context.getComponent("quartz", QuartzComponent.class).stop();
    success.assertIsSatisfied();
}

Complete Examples

SimpleScheduledRoutePolicyTest.java
CronScheduledRoutePolicyTest.java

Further Details

https://issues.apache.org/activemq/browse/CAMEL-2936

Friday, September 3, 2010

Overriding a Property value set in a properties file with a JVM System Property

As of Camel 2.5, a new capability now exists in the most excellent and quite useful Camel Properties component to override a property value at runtime using a JVM System property without the need to restart the application to pick up the change.
This may be accomplished by overriding a property value by creating or editing a JVM System property of the same name as the property it replaces.

An example of how this can be done is given below

PropertiesComponent pc = context.getComponent("properties", PropertiesComponent.class);
pc.setCache(false);
        
System.setProperty("cool.end", "mock:override");
System.setProperty("cool.result", "override");

context.addRoutes(new RouteBuilder() {
    @Override
    public void configure() throws Exception {
        from("direct:start").to("properties:cool.end");
        from("direct:foo").to("properties:mock:{{cool.result}}");
    }
});
context.start();

getMockEndpoint("mock:override").expectedMessageCount(2);

template.sendBody("direct:start", "Hello World");
template.sendBody("direct:foo", "Hello Foo");

System.clearProperty("cool.end");
System.clearProperty("cool.result");
        
assertMockEndpointsSatisfied();

Further Details
https://issues.apache.org/activemq/browse/CAMEL-2791

Monday, August 23, 2010

A Camel SIP component for Publishing/Subscribing to a SIP Presence Agent on Camel routes

I have committed a Camel SIP component (based on the Jain SIP Implementation) which allows for simple wiring of SIP based endpoints on a camel route that are capable of communication (publish/subscribe) with a SIP Presence Agent.

A SIP Presence Agent is a software entity that facilitates sending of event notifications to event subscribers and receiving of event publications from an event publisher.

The SIP component supports the creation of Producer (Event Publisher) and Consumer (Event Subscriber) endpoints which can
  • register with a Presence Agent
  • In case of producers
    • publish events and notifications to the Presence Agent
  • In case of consumers,
    • subscribe to the presence agent expressing interest in a specific event
    • receive notifications from the presence agent at periodic intervals following state changes.
SIP is a peer-to-peer communication protocol which requires a SIP Stack with a listener to be instantiated on both the SIP Producer and Consumer (using separate ports if using localhost). This is necessary in order to support the handshakes & acknowledgements exchanged between the SIP Stacks during communication.

Configuring the SIP Endpoint URI

The SIP protocol requires a number of headers to be sent along with the request in order to provide information about the sender, intermediaries, circuit details, call sequence etc. These headers may be injected into the camel context registry to be then looked up by the endpoint URI for injection into the publish/subscribe activity by the SIP Producer or Consumer.

The Mandatory Headers included in any SIP call are
  • FromHeader - Mandatory Header containing details regarding the message originator. 
  • ViaHeader(s) - Header tracking the entities which received and forwarded the message to the next hop
  • ContentTypeHeader - Header containing information regarding the encoding of the Payload. SIP is also capable of sending audio and many different binary and encoded formats.
  • CallIdHeader - Header containing the conversation id which is updated each time a producer or consumer participate in a conversation (i.e. use the same dialog for an interaction).
  • MaxForwardsHeader - Header specifying the maximum number of entities that may be used as 'Via' entities during communication
The Optional Headers included in a SIP call are
  • EventHeader - Used by our Producer (Event Publisher) and Consumer (Event Subscriber) to provide the Presence Agent with details about the Event that they are interested in.
  • ContactHeader - Header containing human readable contact details & information about the message sender 
  • ExpiresHeader - Header containing information regarding message expiry. Expired messages are discarded by the Consumer (Event Subscriber).
  • ExtensionHeader - A Header available for adding any custom information to be sent to the message receiver.   

In addition to headers the SIP URI may be further configured with the following details
  • stackName - Must be a unique value for each endpoint. Name of the SIP Listener listening on a given host/port
  • transport - Communication protocol. Must be TCP or UDP
  • maxMessageSize - Maximum size for a given Message
  • cacheConnections - Whether connections are cached by the SIP Stack
  • automaticDialog - Whether every communication is done through an auto-created Dialog.
  • contentType - the MIME Type for the payload sent on the wire
  • receiveTimeout -  The amount of time between sending a message and receiving an acknowledgement from the Presence Agent or Consumer (Event Subscriber). If there is no acknowledgement, retries may be attempted or the client may abort or cancel further retries. 
  • msgExpiration - The message expiration value that is matched against the incoming value in the ExpiresHeader . Used to check whether a message is considered as expired.

Given these possible configuration values a SipEndpoint URI may be configured in the following way
// Simple Instantiation
from("sip://johndoe@localhost:5154" +  
     "?stackName=Subscriber" +   
     "&toUser=agent" +
     "&toHost=localhost" +
     "&toPort=5152" +
     "&eventHeaderName=evtHdrName" +
     "&eventId=evtid");

//or using Camel Context Registry based Headers
from("sips://johndoe@localhost:5154" +  
     "?stackName=Subscriber2" +   
     "&fromHeader=#fromDetails" +
     "&toHeader=#toDetails" +
     "&viaHeader=#viaDetails" +
     "&contentTypeHeader=#contentTypeDetails" +
     "&callIdHeader =#callIdDetails" +
     "&maxForwardsHeader =#maxForwardsDetails" +
     "&eventHeader=#eventDetails" +
     "&contactHeader=#contactDetails" +
     "&expiresHeader=#expiresDetails" +
     "&extensionHeader=#extensionDetails" +
     "&eventId=evtid");

Creating a Camel SIP Publisher

In the example below, a SIP Publisher is created to send SIP Event publications to
  • a user "agent@localhost:5152". This is the address of the SIP Presence Agent which acts as a broker between the SIP Publisher and Subscriber
  • using a SIP Stack named client
  • using a registry based eventHeader called evtHdrName
  • using a registry based eventId called evtId
  • from a SIP Stack with Listener set up as user2@localhost:3534
  • The Event being published is EVENT_A
  • A Mandatory Header called REQUEST_METHOD is set to Request.Publish thereby setting up the endpoint as a Event pubisher"
producerTemplate.sendBodyAndHeader(
    "sip://agent@localhost:5152?stackName=client&eventHeaderName=evtHdrName&eventId=evtid&fromUser=user2&fromHost=localhost&fromPort=3534", 
    "EVENT_A",
    "REQUEST_METHOD", 
    Request.PUBLISH);

Creating a Camel SIP Subscriber

In the example below, a SIP Subscriber is created to receive SIP Event publications sent to
  • a user "johndoe@localhost:5154"
  • using a SIP Stack named Subscriber
  • registering with a Presence Agent user called agent@localhost:5152
  • using a registry based eventHeader called evtHdrName. The evtHdrName contains the Event which is se to "Event_A"
  • using a registry based eventId called evtId
@Override
    protected RouteBuilder createRouteBuilder() throws Exception {
        return new RouteBuilder() {
            @Override
            public void configure() throws Exception {  
                // Create PresenceAgent
                from("sip://agent@localhost:5152?stackName=PresenceAgent&presenceAgent=true&eventHeaderName=evtHdrName&eventId=evtid")
                    .to("mock:neverland");
                
                // Create Sip Consumer(Event Subscriber)
                from("sip://johndoe@localhost:5154?stackName=Subscriber&toUser=agent&toHost=localhost&toPort=5152&eventHeaderName=evtHdrName&eventId=evtid")
                    .to("log:ReceivedEvent?level=DEBUG")
                    .to("mock:notification");
                
            }
        };
    }

The Camel SIP component also ships with a Presence Agent that is meant to be used for Testing and Demo purposes only. An example of instantiating a Presence Agent is given above. Note that the Presence Agent is set up as a user agent@localhost:5152 and is capable of communicating with both Publisher as well as Subscriber. It has a separate SIP stackName distinct from Publisher as well as Subscriber. While it is set up as a Camel Consumer, it does not actually send any messages along the route to the endpoint "mock:neverland".

Complete Examples

PublishSubscribeTest

Further Details

https://issues.apache.org/activemq/browse/CAMEL-2943

Thursday, August 19, 2010

Customizing Netty Endpoints using Custom Pipeline Factories

I have added an enhancement to the Netty component in Camel 2.5.0 that allows Netty Endpoints to be completely configurable by providing an ability to add a custom channel pipeline factories on both Producer and Consumer endpoints.

This capability facilitates the creation of custom channel pipelines. Custom channel pipelines provide complete control to the user over the handler/interceptor chain by inserting custom handler(s), encoder(s) & decoders without having to specify them in the Netty Endpoint URL in a very simple way.

Given below is a step by step procedure for incorporating custom channel pipeline factories into a Netty Endpoint

Step 1: Create the custom channel pipeline factory

An example of a client-side and server-side channel pipeline factories is given below. In this example below please note the following
  • A Producer linked channel pipeline factory must extend the abstract class ClientPipelineFactory
  • A Consumer linked channel pipeline factory must extend the abstract class ServerPipelineFactory
  • The classes can optionally override the getPipeline() method in order to insert custom handler(s), encoder(s) and decoder(s) 

public class SampleClientChannelPipelineFactory extends ClientPipelineFactory {
    private int maxLineSize = 1024;
    private boolean invoked;
        
    public ChannelPipeline getPipeline() throws Exception {
        invoked = true;
            
        ChannelPipeline channelPipeline = Channels.pipeline();

        channelPipeline.addLast("decoder-DELIM", new DelimiterBasedFrameDecoder(maxLineSize, true, Delimiters.lineDelimiter()));
        channelPipeline.addLast("decoder-SD", new StringDecoder(CharsetUtil.UTF_8));
        channelPipeline.addLast("encoder-SD", new StringEncoder(CharsetUtil.UTF_8));            
        channelPipeline.addLast("handler", new ClientChannelHandler(producer, exchange, callback));

        return channelPipeline;

    }
        
    public boolean isfactoryInvoked() {
        return invoked;
    }
}

public class SampleServerChannelPipelineFactory extends ServerPipelineFactory {
    private int maxLineSize = 1024;
    private boolean invoked;
        
    public ChannelPipeline getPipeline() throws Exception {
        invoked = true;
            
        ChannelPipeline channelPipeline = Channels.pipeline();

        channelPipeline.addLast("encoder-SD", new StringEncoder(CharsetUtil.UTF_8));
        channelPipeline.addLast("decoder-DELIM", new DelimiterBasedFrameDecoder(maxLineSize, true, Delimiters.lineDelimiter()));
        channelPipeline.addLast("decoder-SD", new StringDecoder(CharsetUtil.UTF_8));
        channelPipeline.addLast("handler", new ServerChannelHandler(consumer));

        return channelPipeline;
    }
        
    public boolean isfactoryInvoked() {
        return invoked;
    }      
}

Step 2: Register the custom channel pipeline factory with the Camel Context

In order to register the custom channel pipeline factory you need to do the following
protected void addPipelineFactoryToRegistry() throws Exception {
    Registry registry = camelContext.getRegistry();
    clientPipelineFactory = new TestClientChannelPipelineFactory();
    serverPipelineFactory = new TestServerChannelPipelineFactory();
    registry.bind("cpf", clientPipelineFactory);
    registry.bind("spf", serverPipelineFactory);
}

Step 3: Instructing the Netty Endpoint to utilize the Camel Context registered channel pipeline factory

Netty Producers endpoints may now be configured to use the pipeline factories as follows.
private void sendRequest() throws Exception {
    // Async request
    response = (String) producerTemplate.requestBody(
        "netty:tcp://localhost:5110?clientPipelineFactory=#cpf&textline=true", 
        "Forest Gump describing Vietnam...");        
}

Netty Consumer endoints may be configured as follows
protected Context createJndiContext() throws Exception {
    Properties properties = new Properties();

    // jndi.properties is optional
    InputStream in = getClass().getClassLoader().getResourceAsStream("jndi.properties");
    if (in != null) {
        log.debug("Using jndi.properties from classpath root");
        properties.load(in);
    } else {            
        properties.put("java.naming.factory.initial", "org.apache.camel.util.jndi.CamelInitialContextFactory");
    }
    return new InitialContext(new Hashtable(properties));
}

public void createCamelRoute() throws Exception {
    CamelContext context = new DefaultCamelContext(new JndiRegistry(createJndiContext()); 
    context.addRoutes(new RouteBuilder() {
        public void configure() {
            from("netty:tcp://localhost:5110?serverPipelineFactory=#spf&textline=true")
                .process(new Processor() {
                    public void process(Exchange exchange) throws Exception {
                        exchange.getOut().setBody("Forrest Gump: We was always taking long walks, and we was always looking for a guy named 'Charlie'");                           
                    }
                });                
        }
    });
    context.start();
}

Sample code

NettyCustomPipelineFactoryAsynchTest.java
NettyCustomPipelineFactorySynchTest.java

Further details
https://issues.apache.org/activemq/browse/CAMEL-2713

Tuesday, July 27, 2010

Developed a Apache Shiro based component for applying security (authentication/authorization) along Camel Routes

Last week, I submitted a new Camel security component, based on the Apache Shiro project, to provide authentication and authorization support on Camel routes. This new component allows user and role based security support to be applied to different segments of a camel route.

Shiro Security Basics

Shiro security is applied on a route using a Camel Policy. A Policy in Camel utilizes a strategy pattern for applying interceptors on Camel Processors. It offering the ability to apply cross-cutting concerns (for example. security, transactions etc) on sections/segments of a camel route.

To employ Shiro security on a camel route, a ShiroSecurityPolicy object must be instantiated with security configuration details (including users, passwords, roles etc). This object must then be applied to a camel route. This ShiroSecurityPolicy Object may also be registered in the Camel registry (JNDI or ApplicationContextRegistry) and then utilized on other routes in the Camel Context.

Configuration details are provided to the ShiroSecurityPolicy using an Ini file (properties file) or an Ini object. The Ini file is a standard Shiro configuration file containing user/role details as shown below

A Shiro Ini File containing configuration details
[users]
# user 'ringo' with password 'starr' and the 'sec-level1' role
ringo = starr, sec-level1
george = harrison, sec-level2
john = lennon, sec-level3
paul = mccartney, sec-level3

[roles]
# 'sec-level3' role has all permissions, indicated by the 
# wildcard '*'
sec-level3 = *

# The 'sec-level2' role can do anything with access of permission 
# readonly (*) to help
sec-level2 = zone1:*

# The 'sec-level1' role can do anything with access of permission 
# readonly   
sec-level1 = zone1:readonly:*

Applying Shiro Authentication on a Camel Route

An example of using the information in the Ini file to apply authentication security is shown below. In the following example, the ShiroSecurityPolicy, permits incoming message exchanges containing a encrypted SecurityToken in the Message Header to proceed further following proper authentication. The SecurityToken object contains a Username/Password details that are used to determine where the user is a valid user.

protected RouteBuilder createRouteBuilder() throws Exception {
        final ShiroSecurityPolicy securityPolicy = 
            new ShiroSecurityPolicy("./src/test/resources/securityconfig.ini", passPhrase);
        
        return new RouteBuilder() {
            public void configure() {
                onException(UnknownAccountException.class).
                    to("mock:authenticationException");
                onException(IncorrectCredentialsException.class).
                    to("mock:authenticationException");
                onException(LockedAccountException.class).
                    to("mock:authenticationException");
                onException(AuthenticationException.class).
                    to("mock:authenticationException");
                
                from("direct:secureEndpoint").
                    to("log:incoming payload").
                    policy(securityPolicy).
                    to("mock:success");
            }
        };
    }

Applying Shiro Authorization on a Camel Route

The example below shows how authorization can be applied on a camel route by associating a Permissions List with the ShiroSecurityPolicy. The Permissions List specifies the permissions necessary for the user to proceed with the execution of the route segment. If the user does not have the proper permission set, the request is not authorized to continue any further.

protected RouteBuilder createRouteBuilder() throws Exception {
        List permissionsList = new ArrayList();
        Permission permission = new WildcardPermission("zone1:readwrite:*");
        permissionsList.add(permission);
        
        final ShiroSecurityPolicy securityPolicy = 
             new ShiroSecurityPolicy("./src/test/resources/securityconfig.ini", passPhrase, true, permissionsList);
        
        return new RouteBuilder() {
            public void configure() {
                onException(CamelAuthorizationException.class).
                    to("mock:authorizationException");
                
                from("direct:secureEndpoint").
                    to("log:incoming payload").
                    policy(securityPolicy).
                    to("mock:success");
            }
        };
    }

Sending requests to routes secured by a ShiroSecurityPolicy

Messages and Message Exchanges sent along the camel route where the security policy is applied need to be accompanied by a SecurityToken in the Exchange Header. The SecurityToken is an encrypted object that holds a Username and Password. The SecurityToken is encrypted using AES 128 bit security by default and can be changed to any cipher of your choice.

Given below is an example of how a request may be sent using a ProducerTemplate in Camel along with a SecurityToken

@Test
    public void testSuccessfulShiroAuthenticationWithNoAuthorization() throws Exception {        
        //Incorrect password
        ShiroSecurityToken shiroSecurityToken = new ShiroSecurityToken("ringo", "stirr");
        TestShiroSecurityTokenInjector shiroSecurityTokenInjector = 
            new TestShiroSecurityTokenInjector(shiroSecurityToken, passPhrase);
        
        successEndpoint.expectedMessageCount(1);
        failureEndpoint.expectedMessageCount(0);
        
        template.send("direct:secureEndpoint", shiroSecurityTokenInjector);
        
        successEndpoint.assertIsSatisfied();
        failureEndpoint.assertIsSatisfied();
    } 

Complete Examples

ShiroAuthenticationTest
ShiroAuthorizationTest

Further Details

The submission for this component is available for comment and review at
http://issues.apache.org/activemq/browse/CAMEL-2779

Wednesday, March 31, 2010

Developed a Socket based communication component (TCP/UDP) for Camel using JBoss Netty

Last week, I performed a major commit to the Apache Camel code stream with a new socket communication component based on the JBoss Netty community offering (available under an Apache 2.0 license). This new component supports both TCP and UDP(multicast) communication including SSL support over both protocols.

The component has several options and allows fine-grained control on a number of TCP/UDP communication parameters (buffer sizes, keepAlives, tcpNoDelay etc) and offers both In-Only and In-Out communication on a Camel route. The camel-netty component also support several different types of payloads including Binary, XML, Text and serialized Objects.

http://issues.apache.org/activemq/browse/CAMEL-2371

The component works as follows

Camel Netty routes using a UDP endpoint in request-reply mode
context.addRoutes(new RouteBuilder() {
public void configure() {
  from("netty:udp://localhost:5155?sync=true")
    .process(new Processor() {
       public void process(Exchange exchange) throws Exception {
         Poetry poetry = (Poetry) exchange.getIn().getBody();
         poetry.setPoet("Dr. Sarojini Naidu");
         exchange.getOut().setBody(poetry);
       }
     }
}   
});

Camel Netty routes using a TCP endpoint in one-way mode
context.addRoutes(new RouteBuilder() {
public void configure() {
  from("netty:tcp://localhost:5150")
    .to("mock:result");   
}
});

In the above example, note that
  • the first Camel route uses the UDP protocol with a datagram socket listener on port 5155 and responds to the route invoking client with a populated serializable Poetry response object. (due to sync=true)
  • the second Camel route uses the TCP protocol with a socket listener set up on port 5150 and is designed to receive a one-way request from an invoking client

There are a number of URI parameter settings that do the following
  • keepAlive: boolean setting to ensure socket is not closed due to inactivity
  • tcpNoDelay: boolean setting to improve TCP protocol performance
  • broadcast: boolean setting to choose Multicast over UDP
  • connectTimeoutMillis: time to wait for a socket connection to be available
  • receiveTimeoutMillis: time to wait for a response to be received on a connection
  • reuseAddress: boolean setting to facilitate socket multiplexing
  • sync: boolean setting to set endpoint as one-way or request-response
  • ssl: boolean setting to specify whether SSL encrytion is applied to this endpoint
  • sendBufferSize: the TCP/UDP buffer sizes to be used during outbound communication
  • receiveBufferSize: the TCP/UDP buffer sizes to be used during inbound communication
  • corePoolSize: the number of allocated threads at startup. Defaults to 10
  • maxPoolSize: the maximum number of threads that may be allocated to this endpoint. Defaults to 100

Codec Handlers and SSL Keystores can be set via a JNDI Registry that is associated with the Camel Context. The values that could be passed in, are the following
  • passphrase: password setting to use in order to encrypt/decrypt payloads sent using SSH
  • keyStoreFile: Client side certificate keystore to be used for encryption
  • trustStoreFile: Server side certificate keystore to be used for encryption
  • sslHandler: Reference to a class that could be used to return an SSL Handler
  • encoder: A custom Handler class that can be used to perform special marshalling of outbound payloads. Must override org.jboss.netty.channel.ChannelDownStreamHandler
  • decoder: A custom Handler class that can be used to perform special marshalling of inbound payloads. Must override org.jboss.netty.channel.ChannelUpStreamHandler
  • handler: A custom Handler class that can be used to perform custom processing of Netty events triggered during communication. Must override org.jboss.netty.channel.SimpleChannelHandler

Some other examples of using the camel-netty component are

Using SSL with a Netty based endpoint
JndiRegistry registry = new JndiRegistry(createJndiContext());
registry.bind("password", "changeit");
registry.bind("keyStoreFile", new File("src/test/resources/keystore.jks"));
registry.bind("trustStoreFile", new File("src/test/resources/keystore.jks"));
context.createRegistry(registry);
context.addRoutes(new RouteBuilder() {
public void configure() {
  from("netty:tcp://localhost:5150sync=true&ssl=true&passphrase=#password&keyStoreFile=#ksf&trustStoreFile=#tsf")
     .process(new Processor() {
        public void process(Exchange exchange) throws Exception {
          exchange.getOut().setBody("When You Go Home, Tell Them Of Us And Say, For Your Tomorrow, We Gave Our Today.");                           
        }
     }
}
});

Sunday, January 3, 2010

Developed an Apache Lucene component in Camel to perform indexed searches on a route

I have just submitted a new Lucene Component to the Apache Camel community that
  • builds a searchable index of documents when payloads are sent to the Lucene Endpoint
  • facilitates performing indexed searches in Camel when the payload header contains a QUERY.

http://issues.apache.org/activemq/browse/CAMEL-1472

The component works as follows

Creating a Searchable Document Index in Lucene using Camel
context.addRoutes(new RouteBuilder() {
public void configure() {
from("direct:start")
.to("lucene:stdQuotesIndex:insert?"
+ "analyzer=#stdAnalyzer"
+ "&indexDir=#std&srcDir=#load_dir")
.to("mock:result");

}
});


where each URI parameter setting does the following

  • analyzer: can be any valid implementation of Lucene Directory Analyzer (StandardAnalyzer, WhitespaceAnalyzer, StopAnalyzer... etc)
  • srcDir: an optional directory location for loading Text or XML documents at endpoint or Lucene Index creation. Once created the index can take any exchange body and store its contents in the index.


Important Note: Lucene stipulates that the index be created upfront and then used in a read only mode later for any querying. Hence the index cannot be in flux during query processing. This requires the Lucene Producer to have received its payloads upfront and created the index before any queries can be logged against it.

Since the URI settings cannot be directly passed (since they are object references or break the URI format), I pass them using the JNDI registry associated with the the Default Component (example shown below).

Providing URI values for Analyzer and Initial Load Directory

@Override
protected JndiRegistry createRegistry()
throws Exception {
JndiRegistry registry =
new JndiRegistry(createJndiContext());
registry.bind("std", new File("target/stdindexDir"));
registry.bind("load_dir",
new File("src/test/resources/sources"));
registry.bind("stdAnalyzer",
new StandardAnalyzer(Version.LUCENE_CURRENT));

return registry;
}


I have also added a QueryEndpoint and a Query Processor that is fully capable of running any queries (including wildcards etc) against a Lucene Document Index and present the results in a serialized Hits object (see example provided below for use)

Performing searches using a Query Endpoint

context.addRoutes(new RouteBuilder() {
public void configure() {

from("direct:start").
setHeader("QUERY", constant("Seinfeld"))
.to("lucene:searchIndex:query?"
+ "analyzer=#whitespaceAnalyzer"
+ "&indexDir=#whitespace"
+ "&maxHits=20")
.to("direct:next");

from("direct:next")
.process(new Processor() {
public void process(Exchange exchange)
throws Exception {
Hits hits =
exchange.getIn().getBody(Hits.class);
printResults(hits);
}

private void printResults(Hits hits) {
LOG.debug("Number of hits: "
+ hits.getNumberOfHits());
for (int i = 0; i < hits.getNumberOfHits(); i++) {
LOG.debug("Hit " + i + " Index Location:"
+ hits.getHit().get(i).getHitLocation());
LOG.debug("Hit " + i + " Score:"
+ hits.getHit().get(i).getScore());
LOG.debug("Hit " + i + " Data:"
+ hits.getHit().get(i).getData());
}
}
.to("mock:searchResult");
});