Thursday, March 31, 2011

FuseSource hosts first CamelOne 2011 Event in Washington DC

Fellow Camel riders,

If you are a resident of Wasington DC or plan on pitching your tents there in the last week of May, you might want to check out this event being held by FuseSource. There will be several technical training sessions on Apache Camel, Servicemix, CXF and ActiveMQ.

The event also features presentations by noted speakers such as Gregory Hohpe and James Strachan.

You might also want to attend Apache Camel training sessions by my fellow Camel Committers Jon Anstey and Claus Ibsen. My FuseSource colleague and Apache Karaf committer Adrian Trenaman will be conducting training sessions on Apache CXF and Apache Servicemix.

All in all it should be a cool event to meet (or is it integrate) with other Camel riders and get a small caravan going!!!

Wish I could join... Maybe sometime when the caravan rides through Dallas (Hint, Hint, nudge, nudge, say no more, say no more...)

Please find a link to the event below
http://fusesource.com/camelone2011

Wednesday, February 9, 2011

Applying multiple route policies on Camel Routes

Available as of Camel 2.7
I have recently added a enhancement in Camel 2.7 to apply multiple route policies concurrently on a Camel route.

It is often necessary to apply multiple policies such as throttling, scheduling, ACL's etc to routes concurrently based on the organizational integration requirements. In earlier versions of Camel the route policy was limited to one policy per route.

In the example below, the route testRoute has a startPolicy and throttlePolicy applied concurrently. Both policies are applied as necessary on the route




   
   
        

    

        

   

    
        
        
    


Complete Examples
RoutePoliciesTest.java
RoutePoliciesRefTest.java
SpringRoutePoliciesTest.xml

Further Details
https://issues.apache.org/jira/browse/CAMEL-3254

Friday, January 14, 2011

Definitive book on Camel - "Camel in Action" released

My colleagues Claus Ibsen and Jon Anstey have released an excellent book on Apache Camel. I encourage anyone interested in learning about Camel to check out this book.

I have had the pleasure of working with Claus and Jon at FuseSource and Apache over the last several months and years and recommend this book for readers looking for a definitive guide on Camel written by authorities on the subject.

Please find the links to this book below

http://www.manning.com/ibsen/
http://camel.apache.org/2010/12/25/camel-in-action-is-done.html

I am sure readers and camel users at every level will benefit greatly through this wonderful body of work.

Tuesday, December 28, 2010

Developed a new Routebox component encapsulating camel contexts and routes as camel endpoints

A couple of weeks ago, I contributed an interesting and unique new Camel component called Routebox. Please find the details regarding the Routebox component below.

Before we proceed, A small note about Camel routes

Camel routes are crafted using Java DSL or Spring XML to link URI endpoints representing protocols, technologies and infrastructure components based on well established integration patterns and integration requirements. The routes themselves are discrete, standalone and isolated. Routes are hosted by a camel context which is responsible for lifecycle management and administrative control of hosted routes. Routes are in and of themselves not layered in any way and the same is true of camel contexts.

Now let us identify the need for a Routebox

In integration environments with complex and multi-faceted integration needs along with a wide array of endpoint technologies it is often necessary to craft an integration solution by creating a sense of layering among camel routes effectively organizing them into


Coarse grained routes - routes with Routebox endpoints that facade a collection of inner or lower level routes that are
  • Application specific --> HR routes, Sales routes etc
  • Objective specific --> Shipping routes, Fulfillment routes etc
  • Technology specific --> Database routes, Caching routes, polling routes etc
Fine grained routes - routes that execute a specific business and/or integration pattern.

Requests sent to Routebox endpoints on coarse grained routes can then delegate requests to inner fine grained routes to achieve a specific integration objective via a set of fine grained routes, collect the final inner result, and continue to progress to the next step along the coarse-grained route.

The idea behind a Routebox is to act as a blackbox for a collection of fine grained routes and direct payloads received from the Routebox endpoint to specific inner fine grained routes based on a user defined internal routing strategy or a map that matches headers in payloads to determine the inner routes to which payloads are directed.

This new component facilitates encapsulation and indirection of requests received via a routebox consumer endpoint or producer endpoint on a coarse grained camel route to a set of inner routes declared in an inner camel context.

Further Routebox details

A Routebox endpoint consists of a Routebox URI of the type

routebox:routebox-name?...

It is not necessary for producers do not need to know the inner route endpoint URI and they can simply invoke the Routebox URI endpoint.

The Routebox can be applied on a route as a route consumer or as a route producer.

Route Producer endpoints are of two types
  • Endpoints that direct payloads to a routebox consumer
  • Endpoints that host an inner camel context and direct payloads to inner routes hosted by the camel context

The Routebox URI has a query parameter (sendToConsumer) whose value specifies the type of producer endpoint.

The Routebox itself is currently JVM bound and uses an internal protocol ((SEDA or Direct) to receive incoming requests. The choice of internal protocol is controlled by a query parameter (innerProtocol).

The support for SEDA provides automatic support for asynchronous requests to be sent to the routebox endpoint. Similarly support for the Direct protocol provides support for synchronous requests.

The inner routes in the routebox supports all manner of valid camel endpoints. The inner context is a managed entity controlled by the routebox endpoint. Starting or stopping the routebox endpoint automatically triggers the stoppage of any inner routes and removal of the associated inner camel context for routebox consumers and producers hosting inner contexts and routes.

The dispatching to inner routes is determined by a routebox designer supplied dispatch strategy (a user defined class that implements org.apache.camel.component.routebox.strategy.RouteboxDispatchStrategy). This gives complete control to the routebox designer to direct incoming payloads to different fine grained routes. Alternatively the routbox designer can supply a HashMap (HashMap) consisting of a header value(key) and endpointUri(value). The user can then use the exchange header to send keys that then dictate the fine grained route to which the payload is sent.

Using the Routebox

A Routebox may be created by supplying the Routebox URI with one of the 2 options given below via the Camel Registry.
  • A custom camel context 
  • A List of Routebuilders to be launched in a camel context automatically created by the Routebox

Routebox Example using a List of Routebuilders

Step 1: Create a Registry containing a list of routebuilders and a dispatch strategy. Then add the registry to the Camel Context

protected Context createJndiContext() throws Exception {
        Properties properties = new Properties();

        // jndi.properties is optional
        InputStream in = getClass().getClassLoader().getResourceAsStream("jndi.properties");
        if (in != null) {
            log.debug("Using jndi.properties from classpath root");
            properties.load(in);
        } else {
            // set the default initial factory
            properties.put("java.naming.factory.initial", "org.apache.camel.util.jndi.CamelInitialContextFactory");
        }
        return new InitialContext(new Hashtable(properties));
    }

    protected JndiRegistry createRegistry() throws Exception {
        JndiRegistry registry = new JndiRegistry(createJndiContext());
        
        // Wire the routeDefinitions & dispatchStrategy to the outer camelContext 
        // where the routebox is declared
        List routes = new ArrayList();
        routes.add(new SimpleRouteBuilder());
        registry.bind("registry", createInnerRegistry());
        registry.bind("routes", routes);
        registry.bind("strategy", new SimpleRouteDispatchStrategy());
        
        return registry;
    }

    private JndiRegistry createInnerRegistry() throws Exception {
        JndiRegistry innerRegistry = new JndiRegistry(createJndiContext());
        BookCatalog catalogBean = new BookCatalog();
        innerRegistry.bind("library", catalogBean);        
        
        return innerRegistry;
    }

    protected CamelContext createCamelContext() throws Exception {
        return new DefaultCamelContext(createRegistry());
    }

Step 2: Create a new routebox route to be launched in the camel context. Note that the # entries in the routeboxUri are matched to the created inner registry, routebuilder list and dispatchStrategy in the CamelContext Registry. Note that all routebuilders and associated routes are launched in the routebox created inner context

private String routeboxUri = "routebox:multipleRoutes?innerRegistry=#registry&routeBuilders=#routes&dispatchStrategy=#strategy";

    public void testRouteboxDirectAsyncRequests() throws Exception {
        CamelContext context = createCamelContext();
        template = new DefaultProducerTemplate(context);
        template.start();        
     
        context.addRoutes(new RouteBuilder() {
            public void configure() {
                from(routeboxUri)
                    .to("log:Routes operation performed?showAll=true");
            }
        });
        context.start();

        // Now use the ProducerTemplate to send the request to the routebox
        template.requestBodyAndHeader(routeboxUri, book, "ROUTE_DISPATCH_KEY", operation);
    }

Complete Examples
RouteboxDefaultContextAndRouteBuilderTest.java
RouteboxDirectProducerOnlyTest.java
RouteboxDirectTest.java
RouteboxDispatchMapTest.java
RouteboxSedaTest.java
SimpleRouteBuilder.java
RouteboxDemoTestSupport.java
BookCatalog.java
Book.java

Further Details
https://issues.apache.org/jira/browse/CAMEL-3285

Tuesday, October 26, 2010

Weighted Round-Robin and Random Load balancing further simplified in Camel

I have further simplified the weighted round-robin and random load balancing support in Camel 2.6 based on comments and feedback from Claus Ibsen (http://davsclaus.blogspot.com/).

The changes are primarily in 2 areas

1> In Camel 2.5, in the case where the distributionRatio did not match the number
     of specified load balancing processors, I had opted to use best effort load balancing
     at runtime along with log warnings. In Camel 2.6, this behavior has now been
     changed to throw an exception during route startup to indicate that the
     distributionRatio does not match the number of processors.

2> In Camel 2.5 , the distributionRatio was passed to DSL or Spring XML as a List 
     as shown in
         a> http://opensourceknowledge.blogspot.com/2010/10/added-support-for-weighted-round-robin.html
         b> Camel Documentation (http://camel.apache.org/load-balancer.html).
               I have modified this in Camel 2.6 based on Claus' recommendation and made 

               it easier for the user by changing the distribution ratio to a String that expects
               an input of integer weights separated by a delimiter. The delimiter can also be
               influenced using distributionRatioDelimiter String (the default delimiter being ",").

Given below are examples of how this simplifies the DSL for Camel 2.6

public void testRoundRobin() throws Exception {

        x.expectedMessageCount(5);
        y.expectedMessageCount(2);
        z.expectedMessageCount(1);

        context.addRoutes(new RouteBuilder() {
            public void configure() {
                
                // START SNIPPET: example
                from("direct:start").loadBalance().
                weighted(true, "4,2,1").to("mock:x", "mock:y", "mock:z");
                // END SNIPPET: example
            }
        });
        context.start();
        
        sendMessages(1, 2, 3, 4, 5, 6, 7, 8);
        
        assertMockEndpointsSatisfied();
        x.expectedBodiesReceived(1, 4, 6, 7, 8);
        y.expectedBodiesReceived(2, 5);
        z.expectedBodiesReceived(3);
    }

For Spring XML examples, please check out the following links

weightedRoundRobinLoadBalance.xml
weightedRandomLoadBalance.xml

Thursday, October 14, 2010

Added support for Weighted Round-Robin and Weighted Random Load balancing in Camel

I have just added a new capability in Camel 2.5, to offer Weighted Round-Robin and Weighted Random Load Balancer support.

Camel currently offers several different Load Balancing policies out of the box such as
  • Round-Robin
  • Random
  • Sticky
  • Topic Based and
  • Fail-over
However, in many enterprise environments where server nodes of unequal processing power & performance characteristics are utilized to host services and processing endpoints, it is frequently necessary to distribute processing load based on their individual server capabilities so that some endpoints are not unfairly burdened with requests. Obviously simple round-robin or random load balancing do not alleviate problems of this nature. A Weighted Round-Robin and/or Weighted Random load balancer is invaluable in this regard.

The weighted load balancing policy allows you to specify a processing load distribution ratio for each server with respect to others. You can specify this as a positive processing weight for each server. A larger number indicates that the server can handle a larger load. The weight is utilized to determine the payload distribution ratio to different processing endpoints with respect to others.

In addition to the weight, endpoint selection is then further refined based on an algorithm (round-robin/random).

Weighted Round Robin Load Balancing

Given below is a Camel example of the weighted round-robin capability in action. Note that there are 3 mock camel endpoints (mock:x, mock:y and mock:z). The important thing is to associate a distribution ratio (list of server weights) and wire it into a load balancer on the route  along with a boolean value (specifying whether the distribution algorithm is round-robin (true in this case).

In the example below we expect endpoints mock:x, mock:y and mock:z to receive 4, 2 and 1 messages  out of every 7 messages respectively. In addition we expect each endpoint to receive messages in strict round-robin order until its weight no longer permits further delivery of messages until the next cycle.

public void testRoundRobin() throws Exception {

        x.expectedMessageCount(5);
        y.expectedMessageCount(2);
        z.expectedMessageCount(1);

        context.addRoutes(new RouteBuilder() {
            public void configure() {
                ArrayList distributionRatio = new ArrayList();
                distributionRatio.add(4);
                distributionRatio.add(2);
                distributionRatio.add(1);
                
                // START SNIPPET: example
                from("direct:start").loadBalance().
                weighted(true, distributionRatio).to("mock:x", "mock:y", "mock:z");
                // END SNIPPET: example
            }
        });
        context.start();
        
        sendMessages(1, 2, 3, 4, 5, 6, 7, 8);
        
        assertMockEndpointsSatisfied();
        x.expectedBodiesReceived(1, 4, 6, 7, 8);
        y.expectedBodiesReceived(2, 5);
        z.expectedBodiesReceived(3);
    }

Weighted Random Load Balancing

Given below is a Camel example of the weighted round-robin capability in action. Note that there are 3 mock camel endpoints (mock:x, mock:y and mock:z). Again in this case, the important thing is to associate a distribution ratio (list of server weights) and wire it into a load balancer on the route  along with a boolean value (specifying whether the distribution algorithm is round-robin (false in this case).

In the example below we expect endpoints mock:x, mock:y and mock:z to receive 4, 2 and 1 messages out of every 7 messages respectively. In addition we expect each endpoint to receive messages in a random order until its weight no longer permits further delivery of messages until the next cycle.

public void testRandom() throws Exception {

        x.expectedMessageCount(4);
        y.expectedMessageCount(2);
        z.expectedMessageCount(1);

        context.addRoutes(new RouteBuilder() {
            public void configure() {
                ArrayList distributionRatio = new ArrayList();
                distributionRatio.add(4);
                distributionRatio.add(2);
                distributionRatio.add(1);
                
                // START SNIPPET: example
                from("direct:start").loadBalance().
                weighted(false, distributionRatio).to("mock:x", "mock:y", "mock:z");
                // END SNIPPET: example
            }
        });
        context.start();
        
        sendMessages(1, 2, 3, 4, 5, 6, 7);
        
        assertMockEndpointsSatisfied();
    }

Complete Examples
WeightedRoundRobinLoadBalanceTest
WeightedRandomLoadBalanceTest

Further Details
https://issues.apache.org/activemq/browse/CAMEL-3197

Tuesday, September 28, 2010

Developed a capability in the Camel Quartz Component to schedule route activation, de-activation, suspension and resumption

I have just added a feature in Camel-quartz (Camel version 2.5) to facilitate schedule based route activation, de-activation, suspension and resumption.

In enterprise environments, it is frequently necessary to schedule routes to run at certain times during the day or night (for e.g. Routes tied to Batch jobs...).

Scheduling of routes typically involves  the following capabilities

  • Route activation - Starting a route a given start time if the route is in a stopped state awaiting activation.
  • Route de-activation - Shutting down an otherwise active and started route at a given time. 
  • Route suspension - Simply de-activating the route consumer endpoint URI declared on the from(...) section of the route from listening on a given port. The route is still considered as started, however, clients will not be able to send requests along the route. 
  • Route resumption - Resuming the listener on a formerly suspended route consumer endpoint URI. This route is ready to accept requests following route resumption and client requests will be accepted by the route consumer to be forwarded along the route.

The enforcement of the schedule is done via a ScheduledRoutePolicy that must be wired into the route. The ScheduledRoutePolicy currently supports 2 variants
  1. SimpleScheduledRoutePolicy: Where the rules for route activation, de-activation, suspension and resumption are provided using dates, repeat counts and repeat intervals.
  2. CronScheduledRoutePolicy: Where the rules for route activation, de-activation, suspension and resumption are provided using Cron Expressions.
SimpleScheduledRoutePolicy

In order to use a SimpleScheduledRoutePolicy it is necessary to instantiate an object of the type org.apache.camel.routepolicy.quartz.SimpleScheduledRoutePolicy.  The following property values must be set on it to be useful
  • For Starting routes on a Schedule
    • routeStartDate - the initial scheduled Date and time for route start
    • routeStartRepeatCount - no of times to repeat the job
    • routeStartRepeatInterval - the time interval in milliseconds to trigger the next attempt to start the route
  • For Stopping routes on a Schedule
    • routeStopDate - the initial scheduled Date and time for route stop
    • routeStopRepeatCount - no of times to repeat the job
    • routeStopRepeatInterval - the time interval in milliseconds to trigger the next attempt to stop the route
    • routeStopGracePeriod - the time period to wait before initiating graceful route stop (set to 10 seconds by default)
    • routeStopTimeUnit - the time unit for the grace period expressed as java.util.concurrent.TimeUnit (default value is TimeUnit.MILLISECONDS)
  • For Suspending routes on a Schedule
    • routeSuspendDate - the initial scheduled Date and time for route suspension
    • routeSuspendRepeatCount - no of times to repeat the job
    • routeSuspendRepeatInterval - the time interval in milliseconds to trigger the next attempt to start the route
  • For Resuming routes on a Schedule
    • routeResumeDate - the initial scheduled Date and time for route suspension
    • routeResumeRepeatCount - no of times to repeat the job
    • routeResumeRepeatInterval - the time interval in milliseconds to trigger the next attempt to start the route
Given below are examples using a SimpleScheduledRoutePolicy

@Test
public void testScheduledStartRoutePolicy() throws Exception {
    MockEndpoint success = (MockEndpoint) context.getEndpoint("mock:success");        
        
    success.expectedMessageCount(1);
        
    context.getComponent("quartz", QuartzComponent.class).setPropertiesFile("org/apache/camel/routepolicy/quartz/myquartz.properties");
    context.getComponent("quartz", QuartzComponent.class).start();
    context.addRoutes(new RouteBuilder() {
        public void configure() {   
            SimpleScheduledRoutePolicy policy = new SimpleScheduledRoutePolicy();
            long startTime = System.currentTimeMillis() + 3000L;
            policy.setRouteStartDate(new Date(startTime));
            policy.setRouteStartRepeatCount(1);
            policy.setRouteStartRepeatInterval(3000);
                
            from("direct:start")
                .routeId("test")
                .routePolicy(policy)
                .to("mock:success");
        }
    });
    context.start();
    context.stopRoute("test", 0, TimeUnit.MILLISECONDS);
        
    Thread.currentThread().sleep(5000);
    assertTrue(context.getRouteStatus("test") == ServiceStatus.Started);
    template.sendBody("direct:start", "Ready or not, Here, I come");

    context.getComponent("quartz", QuartzComponent.class).stop();
    success.assertIsSatisfied();
}

CronScheduledRoutePolicy

In order to use a CronScheduledRoutePolicy it is necessary to instantiate an object of the type org.apache.camel.routepolicy.quartz.CronScheduledRoutePolicy.  The following property values must be set on it to be useful
  • For Starting routes on a Schedule
    • routeStartTime - the initial scheduled Date and time as a Cron Expression for route start
  • For Stopping routes on a Schedule
    • routeStopTime - the initial scheduled Date and time as a Cron Expression for route stop
    • routeStopGracePeriod - the time period to wait before initiating graceful route stop (set to 10 seconds by default)
    • routeStopTimeUnit - the time unit for the grace period expressed as java.util.concurrent.TimeUnit (default value is TimeUnit.MILLISECONDS)
  • For Suspending routes on a Schedule
    • routeSuspendTime - the initial scheduled Date and time as a Cron Expression for route suspension
  • For Resuming routes on a Schedule
    • routeResumeTime - the initial scheduled Date and time as a Cron Expression for route resumption
Given below are examples using a CronScheduledRoutePolicy

@Test
public void testScheduledStartRoutePolicy() throws Exception {

    MockEndpoint success = (MockEndpoint) context.getEndpoint("mock:success");        
        
    success.expectedMessageCount(1);
        
    context.getComponent("quartz", QuartzComponent.class).setPropertiesFile("org/apache/camel/routepolicy/quartz/myquartz.properties");
    context.getComponent("quartz", QuartzComponent.class).start();
    context.addRoutes(new RouteBuilder() {
        public void configure() {    
            CronScheduledRoutePolicy policy = new CronScheduledRoutePolicy();
            policy.setRouteStartTime("*/3 * * * * ?");
                
            from("direct:start")
                .routeId("test")
                .routePolicy(policy)
                .to("mock:success");
        }
    });
    context.start();
    context.stopRoute("test", 0, TimeUnit.MILLISECONDS);
        
    Thread.currentThread().sleep(4000);
    assertTrue(context.getRouteStatus("test") == ServiceStatus.Started);
    template.sendBody("direct:start", "Ready or not, Here, I come");

    context.getComponent("quartz", QuartzComponent.class).stop();
    success.assertIsSatisfied();
}

Complete Examples

SimpleScheduledRoutePolicyTest.java
CronScheduledRoutePolicyTest.java

Further Details

https://issues.apache.org/activemq/browse/CAMEL-2936

Friday, September 3, 2010

Overriding a Property value set in a properties file with a JVM System Property

As of Camel 2.5, a new capability now exists in the most excellent and quite useful Camel Properties component to override a property value at runtime using a JVM System property without the need to restart the application to pick up the change.
This may be accomplished by overriding a property value by creating or editing a JVM System property of the same name as the property it replaces.

An example of how this can be done is given below

PropertiesComponent pc = context.getComponent("properties", PropertiesComponent.class);
pc.setCache(false);
        
System.setProperty("cool.end", "mock:override");
System.setProperty("cool.result", "override");

context.addRoutes(new RouteBuilder() {
    @Override
    public void configure() throws Exception {
        from("direct:start").to("properties:cool.end");
        from("direct:foo").to("properties:mock:{{cool.result}}");
    }
});
context.start();

getMockEndpoint("mock:override").expectedMessageCount(2);

template.sendBody("direct:start", "Hello World");
template.sendBody("direct:foo", "Hello Foo");

System.clearProperty("cool.end");
System.clearProperty("cool.result");
        
assertMockEndpointsSatisfied();

Further Details
https://issues.apache.org/activemq/browse/CAMEL-2791

Monday, August 23, 2010

A Camel SIP component for Publishing/Subscribing to a SIP Presence Agent on Camel routes

I have committed a Camel SIP component (based on the Jain SIP Implementation) which allows for simple wiring of SIP based endpoints on a camel route that are capable of communication (publish/subscribe) with a SIP Presence Agent.

A SIP Presence Agent is a software entity that facilitates sending of event notifications to event subscribers and receiving of event publications from an event publisher.

The SIP component supports the creation of Producer (Event Publisher) and Consumer (Event Subscriber) endpoints which can
  • register with a Presence Agent
  • In case of producers
    • publish events and notifications to the Presence Agent
  • In case of consumers,
    • subscribe to the presence agent expressing interest in a specific event
    • receive notifications from the presence agent at periodic intervals following state changes.
SIP is a peer-to-peer communication protocol which requires a SIP Stack with a listener to be instantiated on both the SIP Producer and Consumer (using separate ports if using localhost). This is necessary in order to support the handshakes & acknowledgements exchanged between the SIP Stacks during communication.

Configuring the SIP Endpoint URI

The SIP protocol requires a number of headers to be sent along with the request in order to provide information about the sender, intermediaries, circuit details, call sequence etc. These headers may be injected into the camel context registry to be then looked up by the endpoint URI for injection into the publish/subscribe activity by the SIP Producer or Consumer.

The Mandatory Headers included in any SIP call are
  • FromHeader - Mandatory Header containing details regarding the message originator. 
  • ViaHeader(s) - Header tracking the entities which received and forwarded the message to the next hop
  • ContentTypeHeader - Header containing information regarding the encoding of the Payload. SIP is also capable of sending audio and many different binary and encoded formats.
  • CallIdHeader - Header containing the conversation id which is updated each time a producer or consumer participate in a conversation (i.e. use the same dialog for an interaction).
  • MaxForwardsHeader - Header specifying the maximum number of entities that may be used as 'Via' entities during communication
The Optional Headers included in a SIP call are
  • EventHeader - Used by our Producer (Event Publisher) and Consumer (Event Subscriber) to provide the Presence Agent with details about the Event that they are interested in.
  • ContactHeader - Header containing human readable contact details & information about the message sender 
  • ExpiresHeader - Header containing information regarding message expiry. Expired messages are discarded by the Consumer (Event Subscriber).
  • ExtensionHeader - A Header available for adding any custom information to be sent to the message receiver.   

In addition to headers the SIP URI may be further configured with the following details
  • stackName - Must be a unique value for each endpoint. Name of the SIP Listener listening on a given host/port
  • transport - Communication protocol. Must be TCP or UDP
  • maxMessageSize - Maximum size for a given Message
  • cacheConnections - Whether connections are cached by the SIP Stack
  • automaticDialog - Whether every communication is done through an auto-created Dialog.
  • contentType - the MIME Type for the payload sent on the wire
  • receiveTimeout -  The amount of time between sending a message and receiving an acknowledgement from the Presence Agent or Consumer (Event Subscriber). If there is no acknowledgement, retries may be attempted or the client may abort or cancel further retries. 
  • msgExpiration - The message expiration value that is matched against the incoming value in the ExpiresHeader . Used to check whether a message is considered as expired.

Given these possible configuration values a SipEndpoint URI may be configured in the following way
// Simple Instantiation
from("sip://johndoe@localhost:5154" +  
     "?stackName=Subscriber" +   
     "&toUser=agent" +
     "&toHost=localhost" +
     "&toPort=5152" +
     "&eventHeaderName=evtHdrName" +
     "&eventId=evtid");

//or using Camel Context Registry based Headers
from("sips://johndoe@localhost:5154" +  
     "?stackName=Subscriber2" +   
     "&fromHeader=#fromDetails" +
     "&toHeader=#toDetails" +
     "&viaHeader=#viaDetails" +
     "&contentTypeHeader=#contentTypeDetails" +
     "&callIdHeader =#callIdDetails" +
     "&maxForwardsHeader =#maxForwardsDetails" +
     "&eventHeader=#eventDetails" +
     "&contactHeader=#contactDetails" +
     "&expiresHeader=#expiresDetails" +
     "&extensionHeader=#extensionDetails" +
     "&eventId=evtid");

Creating a Camel SIP Publisher

In the example below, a SIP Publisher is created to send SIP Event publications to
  • a user "agent@localhost:5152". This is the address of the SIP Presence Agent which acts as a broker between the SIP Publisher and Subscriber
  • using a SIP Stack named client
  • using a registry based eventHeader called evtHdrName
  • using a registry based eventId called evtId
  • from a SIP Stack with Listener set up as user2@localhost:3534
  • The Event being published is EVENT_A
  • A Mandatory Header called REQUEST_METHOD is set to Request.Publish thereby setting up the endpoint as a Event pubisher"
producerTemplate.sendBodyAndHeader(
    "sip://agent@localhost:5152?stackName=client&eventHeaderName=evtHdrName&eventId=evtid&fromUser=user2&fromHost=localhost&fromPort=3534", 
    "EVENT_A",
    "REQUEST_METHOD", 
    Request.PUBLISH);

Creating a Camel SIP Subscriber

In the example below, a SIP Subscriber is created to receive SIP Event publications sent to
  • a user "johndoe@localhost:5154"
  • using a SIP Stack named Subscriber
  • registering with a Presence Agent user called agent@localhost:5152
  • using a registry based eventHeader called evtHdrName. The evtHdrName contains the Event which is se to "Event_A"
  • using a registry based eventId called evtId
@Override
    protected RouteBuilder createRouteBuilder() throws Exception {
        return new RouteBuilder() {
            @Override
            public void configure() throws Exception {  
                // Create PresenceAgent
                from("sip://agent@localhost:5152?stackName=PresenceAgent&presenceAgent=true&eventHeaderName=evtHdrName&eventId=evtid")
                    .to("mock:neverland");
                
                // Create Sip Consumer(Event Subscriber)
                from("sip://johndoe@localhost:5154?stackName=Subscriber&toUser=agent&toHost=localhost&toPort=5152&eventHeaderName=evtHdrName&eventId=evtid")
                    .to("log:ReceivedEvent?level=DEBUG")
                    .to("mock:notification");
                
            }
        };
    }

The Camel SIP component also ships with a Presence Agent that is meant to be used for Testing and Demo purposes only. An example of instantiating a Presence Agent is given above. Note that the Presence Agent is set up as a user agent@localhost:5152 and is capable of communicating with both Publisher as well as Subscriber. It has a separate SIP stackName distinct from Publisher as well as Subscriber. While it is set up as a Camel Consumer, it does not actually send any messages along the route to the endpoint "mock:neverland".

Complete Examples

PublishSubscribeTest

Further Details

https://issues.apache.org/activemq/browse/CAMEL-2943

Thursday, August 19, 2010

Customizing Netty Endpoints using Custom Pipeline Factories

I have added an enhancement to the Netty component in Camel 2.5.0 that allows Netty Endpoints to be completely configurable by providing an ability to add a custom channel pipeline factories on both Producer and Consumer endpoints.

This capability facilitates the creation of custom channel pipelines. Custom channel pipelines provide complete control to the user over the handler/interceptor chain by inserting custom handler(s), encoder(s) & decoders without having to specify them in the Netty Endpoint URL in a very simple way.

Given below is a step by step procedure for incorporating custom channel pipeline factories into a Netty Endpoint

Step 1: Create the custom channel pipeline factory

An example of a client-side and server-side channel pipeline factories is given below. In this example below please note the following
  • A Producer linked channel pipeline factory must extend the abstract class ClientPipelineFactory
  • A Consumer linked channel pipeline factory must extend the abstract class ServerPipelineFactory
  • The classes can optionally override the getPipeline() method in order to insert custom handler(s), encoder(s) and decoder(s) 

public class SampleClientChannelPipelineFactory extends ClientPipelineFactory {
    private int maxLineSize = 1024;
    private boolean invoked;
        
    public ChannelPipeline getPipeline() throws Exception {
        invoked = true;
            
        ChannelPipeline channelPipeline = Channels.pipeline();

        channelPipeline.addLast("decoder-DELIM", new DelimiterBasedFrameDecoder(maxLineSize, true, Delimiters.lineDelimiter()));
        channelPipeline.addLast("decoder-SD", new StringDecoder(CharsetUtil.UTF_8));
        channelPipeline.addLast("encoder-SD", new StringEncoder(CharsetUtil.UTF_8));            
        channelPipeline.addLast("handler", new ClientChannelHandler(producer, exchange, callback));

        return channelPipeline;

    }
        
    public boolean isfactoryInvoked() {
        return invoked;
    }
}

public class SampleServerChannelPipelineFactory extends ServerPipelineFactory {
    private int maxLineSize = 1024;
    private boolean invoked;
        
    public ChannelPipeline getPipeline() throws Exception {
        invoked = true;
            
        ChannelPipeline channelPipeline = Channels.pipeline();

        channelPipeline.addLast("encoder-SD", new StringEncoder(CharsetUtil.UTF_8));
        channelPipeline.addLast("decoder-DELIM", new DelimiterBasedFrameDecoder(maxLineSize, true, Delimiters.lineDelimiter()));
        channelPipeline.addLast("decoder-SD", new StringDecoder(CharsetUtil.UTF_8));
        channelPipeline.addLast("handler", new ServerChannelHandler(consumer));

        return channelPipeline;
    }
        
    public boolean isfactoryInvoked() {
        return invoked;
    }      
}

Step 2: Register the custom channel pipeline factory with the Camel Context

In order to register the custom channel pipeline factory you need to do the following
protected void addPipelineFactoryToRegistry() throws Exception {
    Registry registry = camelContext.getRegistry();
    clientPipelineFactory = new TestClientChannelPipelineFactory();
    serverPipelineFactory = new TestServerChannelPipelineFactory();
    registry.bind("cpf", clientPipelineFactory);
    registry.bind("spf", serverPipelineFactory);
}

Step 3: Instructing the Netty Endpoint to utilize the Camel Context registered channel pipeline factory

Netty Producers endpoints may now be configured to use the pipeline factories as follows.
private void sendRequest() throws Exception {
    // Async request
    response = (String) producerTemplate.requestBody(
        "netty:tcp://localhost:5110?clientPipelineFactory=#cpf&textline=true", 
        "Forest Gump describing Vietnam...");        
}

Netty Consumer endoints may be configured as follows
protected Context createJndiContext() throws Exception {
    Properties properties = new Properties();

    // jndi.properties is optional
    InputStream in = getClass().getClassLoader().getResourceAsStream("jndi.properties");
    if (in != null) {
        log.debug("Using jndi.properties from classpath root");
        properties.load(in);
    } else {            
        properties.put("java.naming.factory.initial", "org.apache.camel.util.jndi.CamelInitialContextFactory");
    }
    return new InitialContext(new Hashtable(properties));
}

public void createCamelRoute() throws Exception {
    CamelContext context = new DefaultCamelContext(new JndiRegistry(createJndiContext()); 
    context.addRoutes(new RouteBuilder() {
        public void configure() {
            from("netty:tcp://localhost:5110?serverPipelineFactory=#spf&textline=true")
                .process(new Processor() {
                    public void process(Exchange exchange) throws Exception {
                        exchange.getOut().setBody("Forrest Gump: We was always taking long walks, and we was always looking for a guy named 'Charlie'");                           
                    }
                });                
        }
    });
    context.start();
}

Sample code

NettyCustomPipelineFactoryAsynchTest.java
NettyCustomPipelineFactorySynchTest.java

Further details
https://issues.apache.org/activemq/browse/CAMEL-2713