Thursday, August 19, 2010

Customizing Netty Endpoints using Custom Pipeline Factories

I have added an enhancement to the Netty component in Camel 2.5.0 that allows Netty Endpoints to be completely configurable by providing an ability to add a custom channel pipeline factories on both Producer and Consumer endpoints.

This capability facilitates the creation of custom channel pipelines. Custom channel pipelines provide complete control to the user over the handler/interceptor chain by inserting custom handler(s), encoder(s) & decoders without having to specify them in the Netty Endpoint URL in a very simple way.

Given below is a step by step procedure for incorporating custom channel pipeline factories into a Netty Endpoint

Step 1: Create the custom channel pipeline factory

An example of a client-side and server-side channel pipeline factories is given below. In this example below please note the following
  • A Producer linked channel pipeline factory must extend the abstract class ClientPipelineFactory
  • A Consumer linked channel pipeline factory must extend the abstract class ServerPipelineFactory
  • The classes can optionally override the getPipeline() method in order to insert custom handler(s), encoder(s) and decoder(s) 

public class SampleClientChannelPipelineFactory extends ClientPipelineFactory {
    private int maxLineSize = 1024;
    private boolean invoked;
        
    public ChannelPipeline getPipeline() throws Exception {
        invoked = true;
            
        ChannelPipeline channelPipeline = Channels.pipeline();

        channelPipeline.addLast("decoder-DELIM", new DelimiterBasedFrameDecoder(maxLineSize, true, Delimiters.lineDelimiter()));
        channelPipeline.addLast("decoder-SD", new StringDecoder(CharsetUtil.UTF_8));
        channelPipeline.addLast("encoder-SD", new StringEncoder(CharsetUtil.UTF_8));            
        channelPipeline.addLast("handler", new ClientChannelHandler(producer, exchange, callback));

        return channelPipeline;

    }
        
    public boolean isfactoryInvoked() {
        return invoked;
    }
}

public class SampleServerChannelPipelineFactory extends ServerPipelineFactory {
    private int maxLineSize = 1024;
    private boolean invoked;
        
    public ChannelPipeline getPipeline() throws Exception {
        invoked = true;
            
        ChannelPipeline channelPipeline = Channels.pipeline();

        channelPipeline.addLast("encoder-SD", new StringEncoder(CharsetUtil.UTF_8));
        channelPipeline.addLast("decoder-DELIM", new DelimiterBasedFrameDecoder(maxLineSize, true, Delimiters.lineDelimiter()));
        channelPipeline.addLast("decoder-SD", new StringDecoder(CharsetUtil.UTF_8));
        channelPipeline.addLast("handler", new ServerChannelHandler(consumer));

        return channelPipeline;
    }
        
    public boolean isfactoryInvoked() {
        return invoked;
    }      
}

Step 2: Register the custom channel pipeline factory with the Camel Context

In order to register the custom channel pipeline factory you need to do the following
protected void addPipelineFactoryToRegistry() throws Exception {
    Registry registry = camelContext.getRegistry();
    clientPipelineFactory = new TestClientChannelPipelineFactory();
    serverPipelineFactory = new TestServerChannelPipelineFactory();
    registry.bind("cpf", clientPipelineFactory);
    registry.bind("spf", serverPipelineFactory);
}

Step 3: Instructing the Netty Endpoint to utilize the Camel Context registered channel pipeline factory

Netty Producers endpoints may now be configured to use the pipeline factories as follows.
private void sendRequest() throws Exception {
    // Async request
    response = (String) producerTemplate.requestBody(
        "netty:tcp://localhost:5110?clientPipelineFactory=#cpf&textline=true", 
        "Forest Gump describing Vietnam...");        
}

Netty Consumer endoints may be configured as follows
protected Context createJndiContext() throws Exception {
    Properties properties = new Properties();

    // jndi.properties is optional
    InputStream in = getClass().getClassLoader().getResourceAsStream("jndi.properties");
    if (in != null) {
        log.debug("Using jndi.properties from classpath root");
        properties.load(in);
    } else {            
        properties.put("java.naming.factory.initial", "org.apache.camel.util.jndi.CamelInitialContextFactory");
    }
    return new InitialContext(new Hashtable(properties));
}

public void createCamelRoute() throws Exception {
    CamelContext context = new DefaultCamelContext(new JndiRegistry(createJndiContext()); 
    context.addRoutes(new RouteBuilder() {
        public void configure() {
            from("netty:tcp://localhost:5110?serverPipelineFactory=#spf&textline=true")
                .process(new Processor() {
                    public void process(Exchange exchange) throws Exception {
                        exchange.getOut().setBody("Forrest Gump: We was always taking long walks, and we was always looking for a guy named 'Charlie'");                           
                    }
                });                
        }
    });
    context.start();
}

Sample code

NettyCustomPipelineFactoryAsynchTest.java
NettyCustomPipelineFactorySynchTest.java

Further details
https://issues.apache.org/activemq/browse/CAMEL-2713

1 comment:

WhiteDonkey said...

Excellent post, there are few posts about using Netty Channnel Pipeline on camel.

I have a question

Why did you put textline=true, when you write the encoders/decoders in the pipeline ????