Monday, August 23, 2010

A Camel SIP component for Publishing/Subscribing to a SIP Presence Agent on Camel routes

I have committed a Camel SIP component (based on the Jain SIP Implementation) which allows for simple wiring of SIP based endpoints on a camel route that are capable of communication (publish/subscribe) with a SIP Presence Agent.

A SIP Presence Agent is a software entity that facilitates sending of event notifications to event subscribers and receiving of event publications from an event publisher.

The SIP component supports the creation of Producer (Event Publisher) and Consumer (Event Subscriber) endpoints which can
  • register with a Presence Agent
  • In case of producers
    • publish events and notifications to the Presence Agent
  • In case of consumers,
    • subscribe to the presence agent expressing interest in a specific event
    • receive notifications from the presence agent at periodic intervals following state changes.
SIP is a peer-to-peer communication protocol which requires a SIP Stack with a listener to be instantiated on both the SIP Producer and Consumer (using separate ports if using localhost). This is necessary in order to support the handshakes & acknowledgements exchanged between the SIP Stacks during communication.

Configuring the SIP Endpoint URI

The SIP protocol requires a number of headers to be sent along with the request in order to provide information about the sender, intermediaries, circuit details, call sequence etc. These headers may be injected into the camel context registry to be then looked up by the endpoint URI for injection into the publish/subscribe activity by the SIP Producer or Consumer.

The Mandatory Headers included in any SIP call are
  • FromHeader - Mandatory Header containing details regarding the message originator. 
  • ViaHeader(s) - Header tracking the entities which received and forwarded the message to the next hop
  • ContentTypeHeader - Header containing information regarding the encoding of the Payload. SIP is also capable of sending audio and many different binary and encoded formats.
  • CallIdHeader - Header containing the conversation id which is updated each time a producer or consumer participate in a conversation (i.e. use the same dialog for an interaction).
  • MaxForwardsHeader - Header specifying the maximum number of entities that may be used as 'Via' entities during communication
The Optional Headers included in a SIP call are
  • EventHeader - Used by our Producer (Event Publisher) and Consumer (Event Subscriber) to provide the Presence Agent with details about the Event that they are interested in.
  • ContactHeader - Header containing human readable contact details & information about the message sender 
  • ExpiresHeader - Header containing information regarding message expiry. Expired messages are discarded by the Consumer (Event Subscriber).
  • ExtensionHeader - A Header available for adding any custom information to be sent to the message receiver.   

In addition to headers the SIP URI may be further configured with the following details
  • stackName - Must be a unique value for each endpoint. Name of the SIP Listener listening on a given host/port
  • transport - Communication protocol. Must be TCP or UDP
  • maxMessageSize - Maximum size for a given Message
  • cacheConnections - Whether connections are cached by the SIP Stack
  • automaticDialog - Whether every communication is done through an auto-created Dialog.
  • contentType - the MIME Type for the payload sent on the wire
  • receiveTimeout -  The amount of time between sending a message and receiving an acknowledgement from the Presence Agent or Consumer (Event Subscriber). If there is no acknowledgement, retries may be attempted or the client may abort or cancel further retries. 
  • msgExpiration - The message expiration value that is matched against the incoming value in the ExpiresHeader . Used to check whether a message is considered as expired.

Given these possible configuration values a SipEndpoint URI may be configured in the following way
// Simple Instantiation
from("sip://johndoe@localhost:5154" +  
     "?stackName=Subscriber" +   
     "&toUser=agent" +
     "&toHost=localhost" +
     "&toPort=5152" +
     "&eventHeaderName=evtHdrName" +
     "&eventId=evtid");

//or using Camel Context Registry based Headers
from("sips://johndoe@localhost:5154" +  
     "?stackName=Subscriber2" +   
     "&fromHeader=#fromDetails" +
     "&toHeader=#toDetails" +
     "&viaHeader=#viaDetails" +
     "&contentTypeHeader=#contentTypeDetails" +
     "&callIdHeader =#callIdDetails" +
     "&maxForwardsHeader =#maxForwardsDetails" +
     "&eventHeader=#eventDetails" +
     "&contactHeader=#contactDetails" +
     "&expiresHeader=#expiresDetails" +
     "&extensionHeader=#extensionDetails" +
     "&eventId=evtid");

Creating a Camel SIP Publisher

In the example below, a SIP Publisher is created to send SIP Event publications to
  • a user "agent@localhost:5152". This is the address of the SIP Presence Agent which acts as a broker between the SIP Publisher and Subscriber
  • using a SIP Stack named client
  • using a registry based eventHeader called evtHdrName
  • using a registry based eventId called evtId
  • from a SIP Stack with Listener set up as user2@localhost:3534
  • The Event being published is EVENT_A
  • A Mandatory Header called REQUEST_METHOD is set to Request.Publish thereby setting up the endpoint as a Event pubisher"
producerTemplate.sendBodyAndHeader(
    "sip://agent@localhost:5152?stackName=client&eventHeaderName=evtHdrName&eventId=evtid&fromUser=user2&fromHost=localhost&fromPort=3534", 
    "EVENT_A",
    "REQUEST_METHOD", 
    Request.PUBLISH);

Creating a Camel SIP Subscriber

In the example below, a SIP Subscriber is created to receive SIP Event publications sent to
  • a user "johndoe@localhost:5154"
  • using a SIP Stack named Subscriber
  • registering with a Presence Agent user called agent@localhost:5152
  • using a registry based eventHeader called evtHdrName. The evtHdrName contains the Event which is se to "Event_A"
  • using a registry based eventId called evtId
@Override
    protected RouteBuilder createRouteBuilder() throws Exception {
        return new RouteBuilder() {
            @Override
            public void configure() throws Exception {  
                // Create PresenceAgent
                from("sip://agent@localhost:5152?stackName=PresenceAgent&presenceAgent=true&eventHeaderName=evtHdrName&eventId=evtid")
                    .to("mock:neverland");
                
                // Create Sip Consumer(Event Subscriber)
                from("sip://johndoe@localhost:5154?stackName=Subscriber&toUser=agent&toHost=localhost&toPort=5152&eventHeaderName=evtHdrName&eventId=evtid")
                    .to("log:ReceivedEvent?level=DEBUG")
                    .to("mock:notification");
                
            }
        };
    }

The Camel SIP component also ships with a Presence Agent that is meant to be used for Testing and Demo purposes only. An example of instantiating a Presence Agent is given above. Note that the Presence Agent is set up as a user agent@localhost:5152 and is capable of communicating with both Publisher as well as Subscriber. It has a separate SIP stackName distinct from Publisher as well as Subscriber. While it is set up as a Camel Consumer, it does not actually send any messages along the route to the endpoint "mock:neverland".

Complete Examples

PublishSubscribeTest

Further Details

https://issues.apache.org/activemq/browse/CAMEL-2943

Thursday, August 19, 2010

Customizing Netty Endpoints using Custom Pipeline Factories

I have added an enhancement to the Netty component in Camel 2.5.0 that allows Netty Endpoints to be completely configurable by providing an ability to add a custom channel pipeline factories on both Producer and Consumer endpoints.

This capability facilitates the creation of custom channel pipelines. Custom channel pipelines provide complete control to the user over the handler/interceptor chain by inserting custom handler(s), encoder(s) & decoders without having to specify them in the Netty Endpoint URL in a very simple way.

Given below is a step by step procedure for incorporating custom channel pipeline factories into a Netty Endpoint

Step 1: Create the custom channel pipeline factory

An example of a client-side and server-side channel pipeline factories is given below. In this example below please note the following
  • A Producer linked channel pipeline factory must extend the abstract class ClientPipelineFactory
  • A Consumer linked channel pipeline factory must extend the abstract class ServerPipelineFactory
  • The classes can optionally override the getPipeline() method in order to insert custom handler(s), encoder(s) and decoder(s) 

public class SampleClientChannelPipelineFactory extends ClientPipelineFactory {
    private int maxLineSize = 1024;
    private boolean invoked;
        
    public ChannelPipeline getPipeline() throws Exception {
        invoked = true;
            
        ChannelPipeline channelPipeline = Channels.pipeline();

        channelPipeline.addLast("decoder-DELIM", new DelimiterBasedFrameDecoder(maxLineSize, true, Delimiters.lineDelimiter()));
        channelPipeline.addLast("decoder-SD", new StringDecoder(CharsetUtil.UTF_8));
        channelPipeline.addLast("encoder-SD", new StringEncoder(CharsetUtil.UTF_8));            
        channelPipeline.addLast("handler", new ClientChannelHandler(producer, exchange, callback));

        return channelPipeline;

    }
        
    public boolean isfactoryInvoked() {
        return invoked;
    }
}

public class SampleServerChannelPipelineFactory extends ServerPipelineFactory {
    private int maxLineSize = 1024;
    private boolean invoked;
        
    public ChannelPipeline getPipeline() throws Exception {
        invoked = true;
            
        ChannelPipeline channelPipeline = Channels.pipeline();

        channelPipeline.addLast("encoder-SD", new StringEncoder(CharsetUtil.UTF_8));
        channelPipeline.addLast("decoder-DELIM", new DelimiterBasedFrameDecoder(maxLineSize, true, Delimiters.lineDelimiter()));
        channelPipeline.addLast("decoder-SD", new StringDecoder(CharsetUtil.UTF_8));
        channelPipeline.addLast("handler", new ServerChannelHandler(consumer));

        return channelPipeline;
    }
        
    public boolean isfactoryInvoked() {
        return invoked;
    }      
}

Step 2: Register the custom channel pipeline factory with the Camel Context

In order to register the custom channel pipeline factory you need to do the following
protected void addPipelineFactoryToRegistry() throws Exception {
    Registry registry = camelContext.getRegistry();
    clientPipelineFactory = new TestClientChannelPipelineFactory();
    serverPipelineFactory = new TestServerChannelPipelineFactory();
    registry.bind("cpf", clientPipelineFactory);
    registry.bind("spf", serverPipelineFactory);
}

Step 3: Instructing the Netty Endpoint to utilize the Camel Context registered channel pipeline factory

Netty Producers endpoints may now be configured to use the pipeline factories as follows.
private void sendRequest() throws Exception {
    // Async request
    response = (String) producerTemplate.requestBody(
        "netty:tcp://localhost:5110?clientPipelineFactory=#cpf&textline=true", 
        "Forest Gump describing Vietnam...");        
}

Netty Consumer endoints may be configured as follows
protected Context createJndiContext() throws Exception {
    Properties properties = new Properties();

    // jndi.properties is optional
    InputStream in = getClass().getClassLoader().getResourceAsStream("jndi.properties");
    if (in != null) {
        log.debug("Using jndi.properties from classpath root");
        properties.load(in);
    } else {            
        properties.put("java.naming.factory.initial", "org.apache.camel.util.jndi.CamelInitialContextFactory");
    }
    return new InitialContext(new Hashtable(properties));
}

public void createCamelRoute() throws Exception {
    CamelContext context = new DefaultCamelContext(new JndiRegistry(createJndiContext()); 
    context.addRoutes(new RouteBuilder() {
        public void configure() {
            from("netty:tcp://localhost:5110?serverPipelineFactory=#spf&textline=true")
                .process(new Processor() {
                    public void process(Exchange exchange) throws Exception {
                        exchange.getOut().setBody("Forrest Gump: We was always taking long walks, and we was always looking for a guy named 'Charlie'");                           
                    }
                });                
        }
    });
    context.start();
}

Sample code

NettyCustomPipelineFactoryAsynchTest.java
NettyCustomPipelineFactorySynchTest.java

Further details
https://issues.apache.org/activemq/browse/CAMEL-2713