This capability facilitates the creation of custom channel pipelines. Custom channel pipelines provide complete control to the user over the handler/interceptor chain by inserting custom handler(s), encoder(s) & decoders without having to specify them in the Netty Endpoint URL in a very simple way.
Given below is a step by step procedure for incorporating custom channel pipeline factories into a Netty Endpoint
Step 1: Create the custom channel pipeline factory
An example of a client-side and server-side channel pipeline factories is given below. In this example below please note the following
- A Producer linked channel pipeline factory must extend the abstract class ClientPipelineFactory
- A Consumer linked channel pipeline factory must extend the abstract class ServerPipelineFactory
- The classes can optionally override the getPipeline() method in order to insert custom handler(s), encoder(s) and decoder(s)
public class SampleClientChannelPipelineFactory extends ClientPipelineFactory { private int maxLineSize = 1024; private boolean invoked; public ChannelPipeline getPipeline() throws Exception { invoked = true; ChannelPipeline channelPipeline = Channels.pipeline(); channelPipeline.addLast("decoder-DELIM", new DelimiterBasedFrameDecoder(maxLineSize, true, Delimiters.lineDelimiter())); channelPipeline.addLast("decoder-SD", new StringDecoder(CharsetUtil.UTF_8)); channelPipeline.addLast("encoder-SD", new StringEncoder(CharsetUtil.UTF_8)); channelPipeline.addLast("handler", new ClientChannelHandler(producer, exchange, callback)); return channelPipeline; } public boolean isfactoryInvoked() { return invoked; } }
public class SampleServerChannelPipelineFactory extends ServerPipelineFactory { private int maxLineSize = 1024; private boolean invoked; public ChannelPipeline getPipeline() throws Exception { invoked = true; ChannelPipeline channelPipeline = Channels.pipeline(); channelPipeline.addLast("encoder-SD", new StringEncoder(CharsetUtil.UTF_8)); channelPipeline.addLast("decoder-DELIM", new DelimiterBasedFrameDecoder(maxLineSize, true, Delimiters.lineDelimiter())); channelPipeline.addLast("decoder-SD", new StringDecoder(CharsetUtil.UTF_8)); channelPipeline.addLast("handler", new ServerChannelHandler(consumer)); return channelPipeline; } public boolean isfactoryInvoked() { return invoked; } }
Step 2: Register the custom channel pipeline factory with the Camel Context
In order to register the custom channel pipeline factory you need to do the following
protected void addPipelineFactoryToRegistry() throws Exception { Registry registry = camelContext.getRegistry(); clientPipelineFactory = new TestClientChannelPipelineFactory(); serverPipelineFactory = new TestServerChannelPipelineFactory(); registry.bind("cpf", clientPipelineFactory); registry.bind("spf", serverPipelineFactory); }
Step 3: Instructing the Netty Endpoint to utilize the Camel Context registered channel pipeline factory
Netty Producers endpoints may now be configured to use the pipeline factories as follows.
private void sendRequest() throws Exception { // Async request response = (String) producerTemplate.requestBody( "netty:tcp://localhost:5110?clientPipelineFactory=#cpf&textline=true", "Forest Gump describing Vietnam..."); }
Netty Consumer endoints may be configured as follows
protected Context createJndiContext() throws Exception { Properties properties = new Properties(); // jndi.properties is optional InputStream in = getClass().getClassLoader().getResourceAsStream("jndi.properties"); if (in != null) { log.debug("Using jndi.properties from classpath root"); properties.load(in); } else { properties.put("java.naming.factory.initial", "org.apache.camel.util.jndi.CamelInitialContextFactory"); } return new InitialContext(new Hashtable(properties)); } public void createCamelRoute() throws Exception { CamelContext context = new DefaultCamelContext(new JndiRegistry(createJndiContext()); context.addRoutes(new RouteBuilder() { public void configure() { from("netty:tcp://localhost:5110?serverPipelineFactory=#spf&textline=true") .process(new Processor() { public void process(Exchange exchange) throws Exception { exchange.getOut().setBody("Forrest Gump: We was always taking long walks, and we was always looking for a guy named 'Charlie'"); } }); } }); context.start(); }
Sample code
NettyCustomPipelineFactoryAsynchTest.java
NettyCustomPipelineFactorySynchTest.java
Further details
https://issues.apache.org/activemq/browse/CAMEL-2713
1 comment:
Excellent post, there are few posts about using Netty Channnel Pipeline on camel.
I have a question
Why did you put textline=true, when you write the encoders/decoders in the pipeline ????
Post a Comment