Description
1) When the list of destinations is known ahead of time and is static, you can add an element
to the route that will consume messages from a source endpoint and then send the message
out to a list of destinations. (accounting dept & production dept).
2) Using this multicasting technique, we can improve the speed of operations by sending
orders to accounting and production at the same time.
Example :
The incoming order are in the folder file:/venkatjavasource/camel/from.
First all these orders will submit to jms:xmlOrders jms queue.
From here all these orders are multicast to jms:accounting accounting queue and
jms:production production queue.
A multicast sends a message to a number of specified recipients.
1) Code that do multicast is,
from("jms:xmlOrders").multicast().to("jms:accounting", "jms:production");
We can mention JMS to list using comma separator.
UsingMulticastingInCamelRouters.java
package com.venkatjavasource.camel; import javax.jms.ConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.jms.JmsComponent; import org.apache.camel.impl.DefaultCamelContext; public class UsingMulticastingInCamelRouters { public static void main(String[] args) throws Exception { // Create CamelContext CamelContext context = new DefaultCamelContext(); // Connect to embedded ActiveMQ JMS broker ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "vm://localhost"); context.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory)); // add our route to the CamelContext context.addRoutes(new RouteBuilder() { @Override public void configure() { // Content-based router from("file:/venkatjavasource/camel/from?noop=false").choice() .when(header("CamelFileName").endsWith(".xml")) .to("jms:xmlOrders") .when(header("CamelFileName").regex("^.*(csv|csl)$")) .to("jms:csvOrders").otherwise().to("jms:badOrders"); from("jms:xmlOrders").multicast().to("jms:accounting", "jms:production"); // test that our route is working from("jms:accounting").process(new Processor() { public void process(Exchange exchange) throws Exception { System.out.println("Accounting received order: " + exchange.getIn().getHeader("CamelFileName")); } }); from("jms:production").process(new Processor() { public void process(Exchange exchange) throws Exception { System.out.println("Production received order: " + exchange.getIn().getHeader("CamelFileName")); } }); } }); // start the route and let it do its work context.start(); Thread.sleep(2000); // stop the CamelContext context.stop(); } }
Demo :
We have one order with file name Order_xml.xml in file:/venkatjavasource/camel/from
Select “Run As” -> “Java Application”, Order_xml.xml order will be submitted to accounting & production jms queues at a time.
Out Put :
Accounting received order: Order_xml.xml
Production received order: Order_xml.xml
The order will be removed from source directory and back up has taken in same source directory.
PARALLEL MULTICASTING :
1) This will set up the multicast to distribute messages to the destinations in parallel.
from("jms:xmlOrders").multicast().parallelProcessing() .to("jms:accounting", "jms:production");
2) We can mention default thread pool size also,
ExecutorService executor = Executors.newFixedThreadPool(16); from("jms:xmlOrders").multicast() .parallelProcessing().executorService(executor) .to("jms:accounting", "jms:production");
3) By default, the multicast will continue sending messages to destinations even if one
fails.
STOPPING THE MULTICAST ON EXCEPTION :
1) We can use stopOnException feature of the multicast, to stop sending messages to destinations if it fails.
from("jms:xmlOrders") .multicast().stopOnException() .parallelProcessing().executorService(executor) .to("jms:accounting", "jms:production");
UsingMulticastingInCamelRouters.java
package com.venkatjavasource.camel; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import javax.jms.ConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.jms.JmsComponent; import org.apache.camel.impl.DefaultCamelContext; public class UsingMulticastingInCamelRouters { public static void main(String[] args) throws Exception { // Create CamelContext CamelContext context = new DefaultCamelContext(); // Connect to embedded ActiveMQ JMS broker ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "vm://localhost"); context.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory)); // add our route to the CamelContext context.addRoutes(new RouteBuilder() { @Override public void configure() { // Content-based router from("file:/venkatjavasource/camel/from?noop=false").choice() .when(header("CamelFileName").endsWith(".xml")) .to("jms:xmlOrders") .when(header("CamelFileName").regex("^.*(csv|csl)$")) .to("jms:csvOrders").otherwise().to("jms:badOrders"); ExecutorService executor = Executors.newFixedThreadPool(14); from("jms:xmlOrders").multicast().stopOnException() .parallelProcessing().executorService(executor) .to("jms:accounting", "jms:production"); // test that our route is working from("jms:accounting").process(new Processor() { public void process(Exchange exchange) throws Exception { System.out.println("Accounting received order: " + exchange.getIn().getHeader("CamelFileName")); } }); from("jms:production").process(new Processor() { public void process(Exchange exchange) throws Exception { System.out.println("Production received order: " + exchange.getIn().getHeader("CamelFileName")); } }); } }); // start the route and let it do its work context.start(); Thread.sleep(2000); // stop the CamelContext context.stop(); } }
*** Venkat – Happy leaning ****