Wednesday, March 31, 2010

Developed a Socket based communication component (TCP/UDP) for Camel using JBoss Netty

Last week, I performed a major commit to the Apache Camel code stream with a new socket communication component based on the JBoss Netty community offering (available under an Apache 2.0 license). This new component supports both TCP and UDP(multicast) communication including SSL support over both protocols.

The component has several options and allows fine-grained control on a number of TCP/UDP communication parameters (buffer sizes, keepAlives, tcpNoDelay etc) and offers both In-Only and In-Out communication on a Camel route. The camel-netty component also support several different types of payloads including Binary, XML, Text and serialized Objects.

http://issues.apache.org/activemq/browse/CAMEL-2371

The component works as follows

Camel Netty routes using a UDP endpoint in request-reply mode
context.addRoutes(new RouteBuilder() {
public void configure() {
  from("netty:udp://localhost:5155?sync=true")
    .process(new Processor() {
       public void process(Exchange exchange) throws Exception {
         Poetry poetry = (Poetry) exchange.getIn().getBody();
         poetry.setPoet("Dr. Sarojini Naidu");
         exchange.getOut().setBody(poetry);
       }
     }
}   
});

Camel Netty routes using a TCP endpoint in one-way mode
context.addRoutes(new RouteBuilder() {
public void configure() {
  from("netty:tcp://localhost:5150")
    .to("mock:result");   
}
});

In the above example, note that
  • the first Camel route uses the UDP protocol with a datagram socket listener on port 5155 and responds to the route invoking client with a populated serializable Poetry response object. (due to sync=true)
  • the second Camel route uses the TCP protocol with a socket listener set up on port 5150 and is designed to receive a one-way request from an invoking client

There are a number of URI parameter settings that do the following
  • keepAlive: boolean setting to ensure socket is not closed due to inactivity
  • tcpNoDelay: boolean setting to improve TCP protocol performance
  • broadcast: boolean setting to choose Multicast over UDP
  • connectTimeoutMillis: time to wait for a socket connection to be available
  • receiveTimeoutMillis: time to wait for a response to be received on a connection
  • reuseAddress: boolean setting to facilitate socket multiplexing
  • sync: boolean setting to set endpoint as one-way or request-response
  • ssl: boolean setting to specify whether SSL encrytion is applied to this endpoint
  • sendBufferSize: the TCP/UDP buffer sizes to be used during outbound communication
  • receiveBufferSize: the TCP/UDP buffer sizes to be used during inbound communication
  • corePoolSize: the number of allocated threads at startup. Defaults to 10
  • maxPoolSize: the maximum number of threads that may be allocated to this endpoint. Defaults to 100

Codec Handlers and SSL Keystores can be set via a JNDI Registry that is associated with the Camel Context. The values that could be passed in, are the following
  • passphrase: password setting to use in order to encrypt/decrypt payloads sent using SSH
  • keyStoreFile: Client side certificate keystore to be used for encryption
  • trustStoreFile: Server side certificate keystore to be used for encryption
  • sslHandler: Reference to a class that could be used to return an SSL Handler
  • encoder: A custom Handler class that can be used to perform special marshalling of outbound payloads. Must override org.jboss.netty.channel.ChannelDownStreamHandler
  • decoder: A custom Handler class that can be used to perform special marshalling of inbound payloads. Must override org.jboss.netty.channel.ChannelUpStreamHandler
  • handler: A custom Handler class that can be used to perform custom processing of Netty events triggered during communication. Must override org.jboss.netty.channel.SimpleChannelHandler

Some other examples of using the camel-netty component are

Using SSL with a Netty based endpoint
JndiRegistry registry = new JndiRegistry(createJndiContext());
registry.bind("password", "changeit");
registry.bind("keyStoreFile", new File("src/test/resources/keystore.jks"));
registry.bind("trustStoreFile", new File("src/test/resources/keystore.jks"));
context.createRegistry(registry);
context.addRoutes(new RouteBuilder() {
public void configure() {
  from("netty:tcp://localhost:5150sync=true&ssl=true&passphrase=#password&keyStoreFile=#ksf&trustStoreFile=#tsf")
     .process(new Processor() {
        public void process(Exchange exchange) throws Exception {
          exchange.getOut().setBody("When You Go Home, Tell Them Of Us And Say, For Your Tomorrow, We Gave Our Today.");                           
        }
     }
}
});